-
Notifications
You must be signed in to change notification settings - Fork 611
/
Copy pathelasticsearch.rb
204 lines (184 loc) · 7.99 KB
/
elasticsearch.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
require 'elasticsearch/version'
require 'elastic/transport'
require 'elasticsearch/api'
module Elasticsearch
NOT_ELASTICSEARCH_WARNING = 'The client noticed that the server is not Elasticsearch and we do not support this unknown product.'.freeze
SECURITY_PRIVILEGES_VALIDATION_WARNING = 'The client is unable to verify that the server is Elasticsearch due to security privileges on the server side. Some functionality may not be compatible if the server is running an unsupported product.'.freeze
VALIDATION_WARNING = 'The client is unable to verify that the server is Elasticsearch. Some functionality may not be compatible if the server is running an unsupported product.'.freeze
# This is the stateful Elasticsearch::Client, using an instance of elastic-transport.
class Client
include Elasticsearch::API
# The default port to use if connecting using a Cloud ID.
# Updated from 9243 to 443 in client version 7.10.1
#
# @since 7.2.0
DEFAULT_CLOUD_PORT = 443
# Create a client connected to an Elasticsearch cluster.
#
# @param [Hash] arguments - initializer arguments
# @option arguments [String] :cloud_id - The Cloud ID to connect to Elastic Cloud
# @option arguments [String, Hash] :api_key Use API Key Authentication, either the base64 encoding of `id` and `api_key`
# joined by a colon as a String, or a hash with the `id` and `api_key` values.
# @option arguments [String] :opaque_id_prefix set a prefix for X-Opaque-Id when initializing the client.
# This will be prepended to the id you set before each request
# if you're using X-Opaque-Id
# @option arguments [Hash] :headers Custom HTTP Request Headers
#
def initialize(arguments = {}, &block)
@verified = false
@warned = false
@opaque_id_prefix = arguments[:opaque_id_prefix] || nil
api_key(arguments) if arguments[:api_key]
setup_cloud(arguments) if arguments[:cloud_id]
set_user_agent!(arguments) unless sent_user_agent?(arguments)
@transport = Elastic::Transport::Client.new(arguments, &block)
end
def method_missing(name, *args, &block)
if methods.include?(name)
super
elsif name == :perform_request
# The signature for perform_request is:
# method, path, params, body, headers, opts
# The last arg is opts, which shouldn't be sent when `perform_request` is called
# directly. Check if :endpoint is a key, which means it's the extra identifier
# used for OpenTelemetry.
args.pop if args[-1].is_a?(Hash) && args[-1][:endpoint]
if (opaque_id = args[2]&.delete(:opaque_id))
headers = args[4] || {}
opaque_id = @opaque_id_prefix ? "#{@opaque_id_prefix}#{opaque_id}" : opaque_id
args[4] = headers.merge('X-Opaque-Id' => opaque_id)
end
unless @verified
verify_elasticsearch(*args, &block)
else
@transport.perform_request(*args, &block)
end
else
@transport.send(name, *args, &block)
end
end
def respond_to_missing?(method_name, *args)
@transport.respond_to?(method_name) || super
end
private
def verify_elasticsearch(*args, &block)
begin
response = @transport.perform_request(*args, &block)
rescue Elastic::Transport::Transport::Errors::Unauthorized,
Elastic::Transport::Transport::Errors::Forbidden,
Elastic::Transport::Transport::Errors::RequestEntityTooLarge => e
warn(SECURITY_PRIVILEGES_VALIDATION_WARNING)
@verified = true
raise e
rescue Elastic::Transport::Transport::Error => e
unless @warned
warn(VALIDATION_WARNING)
@warned = true
end
raise e
rescue StandardError => e
warn(VALIDATION_WARNING)
raise e
end
raise Elasticsearch::UnsupportedProductError unless response.headers['x-elastic-product'] == 'Elasticsearch'
@verified = true
response
end
def setup_cloud_host(cloud_id, user, password, port)
name = cloud_id.split(':')[0]
cloud_url, elasticsearch_instance = Base64.decode64(cloud_id.gsub("#{name}:", '')).split('$')
if cloud_url.include?(':')
url, port = cloud_url.split(':')
host = "#{elasticsearch_instance}.#{url}"
else
host = "#{elasticsearch_instance}.#{cloud_url}"
port ||= DEFAULT_CLOUD_PORT
end
[{ scheme: 'https', user: user, password: password, host: host, port: port.to_i }]
end
def api_key(arguments)
api_key = if arguments[:api_key].is_a? Hash
encode(arguments[:api_key])
else
arguments[:api_key]
end
arguments.delete(:user)
arguments.delete(:password)
authorization = { 'Authorization' => "ApiKey #{api_key}" }
if (headers = arguments.dig(:transport_options, :headers))
headers.merge!(authorization)
else
arguments[:transport_options] ||= {}
arguments[:transport_options].merge!({ headers: authorization })
end
end
def setup_cloud(arguments)
arguments[:hosts] = setup_cloud_host(
arguments[:cloud_id],
arguments[:user],
arguments[:password],
arguments[:port]
)
end
# Encode credentials for the Authorization Header
# Credentials is the base64 encoding of id and api_key joined by a colon
# @see https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html
def encode(api_key)
Base64.strict_encode64([api_key[:id], api_key[:api_key]].join(':'))
end
def elasticsearch_validation_request
@transport.perform_request('GET', '/')
end
def sent_user_agent?(arguments)
return unless (headers = arguments&.[](:transport_options)&.[](:headers))
!!headers.keys.detect { |h| h =~ /user-?_?agent/ }
end
def set_user_agent!(arguments)
user_agent = [
"elasticsearch-ruby/#{Elasticsearch::VERSION}",
"elastic-transport-ruby/#{Elastic::Transport::VERSION}",
"RUBY_VERSION: #{RUBY_VERSION}"
]
if RbConfig::CONFIG && RbConfig::CONFIG['host_os']
user_agent << "#{RbConfig::CONFIG['host_os'].split('_').first[/[a-z]+/i].downcase} #{RbConfig::CONFIG['target_cpu']}"
end
arguments[:transport_options] ||= {}
arguments[:transport_options][:headers] ||= {}
arguments[:transport_options][:headers].merge!({ user_agent: user_agent.join('; ')})
end
end
# Error class for when we detect an unsupported version of Elasticsearch
class UnsupportedProductError < StandardError
def initialize(message = NOT_ELASTICSEARCH_WARNING)
super(message)
end
end
end
# Helper for the meta-header value for Cloud
module Elastic
# If the version is X.X.X.pre/alpha/beta, use X.X.Xp for the meta-header:
def self.client_meta_version
regexp = /^([0-9]+\.[0-9]+\.[0-9]+)\.?([a-z0-9.-]+)?$/
match = Elasticsearch::VERSION.match(regexp)
return "#{match[1]}p" if match[2]
Elasticsearch::VERSION
end
# Constant for elastic-transport meta-header
ELASTICSEARCH_SERVICE_VERSION = [:es, client_meta_version].freeze
end