From 621fe38ebd0310f98d007f2756326ce4583c189f Mon Sep 17 00:00:00 2001 From: Daniel Berkompas Date: Wed, 11 Apr 2018 15:53:23 -0700 Subject: [PATCH] WIP --- lib/elasticsearch/cluster/api/api.ex | 18 +++ lib/elasticsearch/cluster/api/http.ex | 84 ++++++++++ lib/elasticsearch/cluster/cluster.ex | 224 ++++++++++++++++++++++++++ mix.exs | 3 +- mix.lock | 1 + 5 files changed, 329 insertions(+), 1 deletion(-) create mode 100644 lib/elasticsearch/cluster/api/api.ex create mode 100644 lib/elasticsearch/cluster/api/http.ex create mode 100644 lib/elasticsearch/cluster/cluster.ex diff --git a/lib/elasticsearch/cluster/api/api.ex b/lib/elasticsearch/cluster/api/api.ex new file mode 100644 index 0000000..11b0ab5 --- /dev/null +++ b/lib/elasticsearch/cluster/api/api.ex @@ -0,0 +1,18 @@ +defmodule Elasticsearch.Cluster.API do + @moduledoc """ + Defines the necessary callbacks for integrating with the Elasticsearch + JSON API. + """ + + @type url :: String.t() + @type data :: map | Keyword.t() + @type opts :: Keyword.t() + @type config :: map + @type method :: :get | :put | :post | :delete + + @type response :: + {:ok, HTTPoison.Response.t() | HTTPoison.AsyncResponse.t()} + | {:error, HTTPoison.Error.t()} + + @callback request(config, method, url, data, opts) :: response +end diff --git a/lib/elasticsearch/cluster/api/http.ex b/lib/elasticsearch/cluster/api/http.ex new file mode 100644 index 0000000..8769b62 --- /dev/null +++ b/lib/elasticsearch/cluster/api/http.ex @@ -0,0 +1,84 @@ +defmodule Elasticsearch.Cluster.API.HTTP do + @behaviour Elasticsearch.Cluster.API + + @impl true + def request(config, method, url, data, opts) do + method + |> HTTPoison.request( + process_url(url, config), + process_request_body(data, config), + headers(config), + opts + ) + |> process_response(config) + end + + # Respect absolute URLs if passed + defp process_url("http" <> _rest = url, _config) do + url + end + + # On relative urls, prepend the configured base URL + defp process_url(url, config) do + Path.join(config.url, url) + end + + # Converts the request body into JSON, unless it has already + # been converted + defp process_request_body(data, _config) when is_binary(data) do + data + end + + defp process_request_body(data, config) when is_map(data) do + json_library(config).encode!(data) + end + + # Converts the response body string from JSON into a map, if it looks like it + # is actually JSON + defp process_response({:ok, %{body: body} = response}, config) do + body = + cond do + json?(body) -> json_library(config).decode!(body) + true -> body + end + + {:ok, %{response | body: body}} + end + + defp process_response(response, _config) do + response + end + + defp json?(str) when is_binary(str) do + str =~ ~r/^\{/ || str =~ ~r/^\[/ + end + + # Produces request headers for the request, based on the configuration + defp headers(config) do + headers = [{"Content-Type", "application/json"}] + + credentials = http_basic_credentials(config) + + if credentials do + [{"Authorization", "Basic #{credentials}"} | headers] + else + headers + end + end + + defp http_basic_credentials(%{username: username, password: password}) do + Base.encode64("#{username}:#{password}") + end + + defp http_basic_credentials(_config) do + nil + end + + defp json_library(%{json_library: json_library}) do + json_library + end + + defp json_library(_config) do + Poison + end +end diff --git a/lib/elasticsearch/cluster/cluster.ex b/lib/elasticsearch/cluster/cluster.ex new file mode 100644 index 0000000..583de3e --- /dev/null +++ b/lib/elasticsearch/cluster/cluster.ex @@ -0,0 +1,224 @@ +defmodule Elasticsearch.Cluster do + @type response :: {:ok, map} | {:error, Elasticsearch.Exception.t()} + @type index_name :: String.t() + @type url :: String.t() + @type opts :: Keyword.t() + @type data :: map + + # Might not need to be callbacks here + # alias Elasticsearch.Document + # @callback put_document(Document.t(), index_name) :: response + # @callback put_document!(Document.t(), index_name) :: map | no_return + # @callback delete_document(Document.t(), index_name) :: response + # @callback delete_document!(Document.t(), index_name) :: map | no_return + # @callback wait_for_boot(tries :: integer) :: response | {:error, RuntimeError.t()} + + @callback init(map) :: {:ok, map} | {:error, any} + + @callback get(url) :: response + @callback get(url, opts) :: response + @callback get!(url) :: map | no_return + @callback get!(url, opts) :: map | no_return + + @callback put(url, data) :: response + @callback put(url, data, opts) :: response + @callback put!(url, data) :: map | no_return + @callback put!(url, data, opts) :: map | no_return + + @callback post(url, data) :: response + @callback post(url, data, opts) :: response + @callback post!(url, data) :: map | no_return + @callback post!(url, data, opts) :: map | no_return + + @callback delete(url) :: response + @callback delete(url, opts) :: response + @callback delete!(url) :: map | no_return + @callback delete!(url, opts) :: map | no_return + + defmacro __using__(opts) do + quote do + use GenServer + + alias Elasticsearch.Cluster + + import Cluster, only: [format: 1, unwrap!: 1] + + @behaviour Cluster + + @impl Cluster + def get(url, opts \\ []) do + config = __config__() + + config + |> config.api.request(:get, url, "", opts) + |> format() + end + + @impl Cluster + def get!(url, opts \\ []) do + url + |> get(opts) + |> unwrap!() + end + + @impl Cluster + def put(url, data, opts \\ []) do + config = __config__() + + config + |> config.api.request(:put, url, data, opts) + |> format() + end + + @impl Cluster + def put!(url, data, opts \\ []) do + url + |> put(data, opts) + |> unwrap!() + end + + @impl Cluster + def post(url, data, opts \\ []) do + config = __config__() + + config + |> config.api.request(:post, url, data, opts) + |> format() + end + + @impl Cluster + def post!(url, data, opts \\ []) do + url + |> post(data, opts) + |> unwrap!() + end + + @impl Cluster + def delete(url, opts \\ []) do + config = __config__() + + config + |> config.api.request(:delete, url, "", opts) + |> format() + end + + @impl Cluster + def delete!(url, opts \\ []) do + url + |> delete(opts) + |> unwrap!() + end + + ### + # GenServer + ### + + # Cache configuration into the state of the GenServer so that + # we aren't running potentially expensive logic to load configuration + # on each function call. + def start_link(config \\ []) do + app_config = + unquote(opts[:otp_app]) + |> Application.get_env(__MODULE__, []) + |> Enum.into(%{}) + + config = Map.merge(app_config, Enum.into(config, %{})) + + # Ensure that the configuration is validated on startup + with {:ok, pid} <- GenServer.start_link(__MODULE__, config, name: __MODULE__), + :ok <- GenServer.call(pid, :validate) do + {:ok, pid} + else + error -> + GenServer.stop(__MODULE__) + error + end + end + + @impl GenServer + def init(config), do: {:ok, config} + + @doc false + def __config__ do + GenServer.call(__MODULE__, :config) + end + + @impl GenServer + @doc false + def handle_call(:config, _from, config) do + {:reply, config, config} + end + + def handle_call(:validate, _from, config) do + case Cluster.validate_config(config) do + {:ok, _config} -> + {:reply, :ok, config} + + error -> + {:reply, error, config} + end + end + + defoverridable init: 1 + end + end + + @doc false + def validate_config(config) do + with {:ok, _config} <- + Vex.validate( + config, + url: &(is_binary(&1) && String.starts_with?(&1, "http")), + username: [presence: [unless: &(&1[:password] == nil)]], + password: [presence: [unless: &(&1[:username] == nil)]], + api: [presence: true, by: &is_atom/1], + json_library: [presence: true, by: &is_atom/1], + bulk_page_size: [presence: true, by: &is_integer/1], + bulk_wait_interval: [presence: true, by: &is_integer/1] + ), + :ok <- validate_indexes(config[:indexes] || []) do + {:ok, config} + end + end + + def validate_indexes(indexes) do + invalid = + indexes + |> Enum.map(&validate_index_config/1) + |> Enum.reject(&match?({:ok, _}, &1)) + |> Enum.map(&elem(&1, 1)) + + if length(invalid) == 0 do + :ok + else + {:error, invalid} + end + end + + @doc false + def validate_index_config(index) do + Vex.validate( + index, + settings: [presence: true, by: &is_binary/1], + store: [presence: true, by: &is_atom/1], + sources: [presence: true, by: &Enum.map(&1, fn source -> is_atom(source) end)] + ) + end + + @doc false + def format({:ok, %{status_code: code, body: body}}) + when code >= 200 and code < 300 do + {:ok, body} + end + + def format({:ok, %{body: body}}) do + error = Elasticsearch.Exception.exception(response: body) + {:error, error} + end + + def format(error), do: error + + @doc false + def unwrap!({:ok, value}), do: value + def unwrap!({:error, exception}), do: raise(exception) +end diff --git a/mix.exs b/mix.exs index 078ff53..289c103 100644 --- a/mix.exs +++ b/mix.exs @@ -58,6 +58,7 @@ defmodule Elasticsearch.Mixfile do [ {:poison, ">= 0.0.0", optional: true}, {:httpoison, ">= 0.0.0"}, + {:vex, "~> 0.6.0"}, {:dialyze, ">= 0.0.0", only: [:dev, :test]}, {:ex_doc, ">= 0.0.0", only: [:dev, :test]}, {:excoveralls, ">= 0.0.0", only: :test} @@ -92,4 +93,4 @@ defmodule Elasticsearch.Mixfile do ] ] end -end \ No newline at end of file +end diff --git a/mix.lock b/mix.lock index 5512536..48bd804 100644 --- a/mix.lock +++ b/mix.lock @@ -15,4 +15,5 @@ "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [], [], "hexpm"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [], [], "hexpm"}, + "vex": {:hex, :vex, "0.6.0", "4e79b396b2ec18cd909eed0450b19108d9631842598d46552dc05031100b7a56", [:mix], [], "hexpm"}, }