Skip to content

Allow custom transformations to be defined for bulk imports. #113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,15 @@ def __find_in_batches(options={}, &block)
scope = named_scope ? self.__send__(named_scope) : self

scope.find_in_batches(options) do |batch|
batch_for_bulk = batch.map { |a| { index: { _id: a.id, data: a.__elasticsearch__.as_indexed_json } } }
yield batch_for_bulk
yield batch
end
end
end

def __transform
lambda {|model| { index: { _id: model.id, data: model.__elasticsearch__.as_indexed_json } }}
end
end
end

end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ module Importing
def __find_in_batches(options={}, &block)
raise NotImplemented, "Method not implemented for default adapter"
end

# @abstract Implement this method in your adapter
#
def __transform
raise NotImplemented, "Method not implemented for default adapter"
end
end

end
Expand Down
10 changes: 6 additions & 4 deletions elasticsearch-model/lib/elasticsearch/model/adapters/mongoid.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,19 @@ def __find_in_batches(options={}, &block)
items << item

if items.length % options[:batch_size] == 0
batch_for_bulk = items.map { |a| { index: { _id: a.id.to_s, data: a.as_indexed_json } } }
yield batch_for_bulk
yield items
items = []
end
end

unless items.empty?
batch_for_bulk = items.map { |a| { index: { _id: a.id.to_s, data: a.as_indexed_json } } }
yield batch_for_bulk
yield items
end
end

def __transform
lambda {|a| { index: { _id: a.id.to_s, data: a.as_indexed_json } }}
end
end

end
Expand Down
24 changes: 20 additions & 4 deletions elasticsearch-model/lib/elasticsearch/model/importing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,24 @@ module ClassMethods
#
# Article.import scope: 'published'
#
# @example Customize how each record is [bulk imported](https://github.com/elasticsearch/elasticsearch-ruby/blob/master/elasticsearch-api/lib/elasticsearch/api/actions/bulk.rb)
#
# transform = lambda do |article|
# {index: {_id: article.id, _parent: article.author_id, data: article.__elasticsearch__.as_indexed_json}}
# end
#
# Article.import transform: transform
#
def import(options={}, &block)
errors = 0
refresh = options.delete(:refresh) || false
target_index = options.delete(:index) || index_name
target_type = options.delete(:type) || document_type
refresh = options.delete(:refresh) || false
target_index = options.delete(:index) || index_name
target_type = options.delete(:type) || document_type
transform = options.delete(:transform) || __transform

if !transform.respond_to?(:call)
raise ArgumentError, "You must pass an object that supports #call method, #{transform.class} given"
end

if options.delete(:force)
self.create_index! force: true, index: target_index
Expand All @@ -82,7 +95,7 @@ def import(options={}, &block)
response = client.bulk \
index: target_index,
type: target_type,
body: batch
body: __batch_to_bulk(batch, transform)

yield response if block_given?

Expand All @@ -94,6 +107,9 @@ def import(options={}, &block)
return errors
end

def __batch_to_bulk(batch, transform)
batch.map {|model| transform.call(model)}
end
end

end
Expand Down
14 changes: 14 additions & 0 deletions elasticsearch-model/test/unit/adapter_active_record_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,20 @@ def ids
DummyClassForActiveRecord.__find_in_batches(scope: :published) do; end
end

context "when transforming models" do
setup do
@transform = DummyClassForActiveRecord.__transform
end

should "provide an object that responds to #call" do
assert_respond_to @transform, :call
end

should "provide basic transformation" do
model = mock("model", id: 1, __elasticsearch__: stub(as_indexed_json: {}))
assert_equal @transform.call(model), { index: { _id: 1, data: {} } }
end
end
end
end
end
18 changes: 14 additions & 4 deletions elasticsearch-model/test/unit/adapter_default_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@ class ::DummyClassForDefaultAdapter; end
assert_instance_of Module, Elasticsearch::Model::Adapter::Default::Callbacks
end

should "have the default Importing implementation" do
DummyClassForDefaultAdapter.__send__ :include, Elasticsearch::Model::Adapter::Default::Importing
context "concerning abstract methods" do
setup do
DummyClassForDefaultAdapter.__send__ :include, Elasticsearch::Model::Adapter::Default::Importing
end

should "have the default Importing implementation" do
assert_raise Elasticsearch::Model::NotImplemented do
DummyClassForDefaultAdapter.new.__find_in_batches
end
end

assert_raise Elasticsearch::Model::NotImplemented do
DummyClassForDefaultAdapter.new.__find_in_batches
should "have the default transform implementation" do
assert_raise Elasticsearch::Model::NotImplemented do
DummyClassForDefaultAdapter.new.__transform
end
end
end

Expand Down
15 changes: 15 additions & 0 deletions elasticsearch-model/test/unit/adapter_mongoid_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,21 @@ def ids
DummyClassForMongoid.__send__ :extend, Elasticsearch::Model::Adapter::Mongoid::Importing
DummyClassForMongoid.__find_in_batches do; end
end

context "when transforming models" do
setup do
@transform = DummyClassForMongoid.__transform
end

should "provide an object that responds to #call" do
assert_respond_to @transform, :call
end

should "provide basic transformation" do
model = mock("model", id: 1, as_indexed_json: {})
assert_equal @transform.call(model), { index: { _id: "1", data: {} } }
end
end
end

end
Expand Down
35 changes: 34 additions & 1 deletion elasticsearch-model/test/unit/importing_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ module ImportingMixin
def __find_in_batches(options={}, &block)
yield if block_given?
end
def __transform
lambda {|a|}
end
end

def importing_mixin
Expand Down Expand Up @@ -41,7 +44,7 @@ def importing_mixin
DummyImportingModel.expects(:client).returns(client)
DummyImportingModel.expects(:index_name).returns('foo')
DummyImportingModel.expects(:document_type).returns('foo')

DummyImportingModel.stubs(:__batch_to_bulk)
assert_equal 0, DummyImportingModel.import
end

Expand All @@ -58,6 +61,7 @@ def importing_mixin
DummyImportingModel.stubs(:client).returns(client)
DummyImportingModel.stubs(:index_name).returns('foo')
DummyImportingModel.stubs(:document_type).returns('foo')
DummyImportingModel.stubs(:__batch_to_bulk)

assert_equal 1, DummyImportingModel.import
end
Expand All @@ -75,6 +79,7 @@ def importing_mixin
DummyImportingModel.stubs(:client).returns(client)
DummyImportingModel.stubs(:index_name).returns('foo')
DummyImportingModel.stubs(:document_type).returns('foo')
DummyImportingModel.stubs(:__batch_to_bulk)

DummyImportingModel.import do |response|
assert_equal 2, response['items'].size
Expand Down Expand Up @@ -116,8 +121,36 @@ def importing_mixin
.returns({'items' => [ {'index' => {} }]})

DummyImportingModel.stubs(:client).returns(client)
DummyImportingModel.stubs(:__batch_to_bulk)

DummyImportingModel.import index: 'my-new-index', type: 'my-other-type'
end

should "default to the adapter's bulk transform" do
client = mock('client', bulk: {'items' => []})
transform = lambda {|a|}

DummyImportingModel.stubs(:client).returns(client)
DummyImportingModel.expects(:__transform).returns(transform)
DummyImportingModel.expects(:__batch_to_bulk).with(anything, transform)

DummyImportingModel.import index: 'foo', type: 'bar'
end

should "use the optioned transform" do
client = mock('client', bulk: {'items' => []})
transform = lambda {|a|}

DummyImportingModel.stubs(:client).returns(client)
DummyImportingModel.expects(:__batch_to_bulk).with(anything, transform)

DummyImportingModel.import index: 'foo', type: 'bar', transform: transform
end

should "raise an ArgumentError if transform is an object that doesn't respond to #call" do
assert_raise ArgumentError do
DummyImportingModel.import index: 'foo', type: 'bar', transform: "not_callable"
end
end
end
end