Skip to content

Commit cc1c947

Browse files
committed
[STORE] Added the find_in_batches method for models
Person.find_in_batches { |batch| puts batch.map(&:name) } See: http://api.rubyonrails.org/classes/ActiveRecord/Batches.html#method-i-find_in_batches
1 parent 2b74c05 commit cc1c947

File tree

3 files changed

+125
-2
lines changed

3 files changed

+125
-2
lines changed

elasticsearch-persistence/lib/elasticsearch/persistence/model/find.rb

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ module Model
55
module Find
66
module ClassMethods
77

8-
# Return all documents (up to 10,000) for a model
8+
# Returns all models (up to 10,000)
99
#
1010
# @example Retrieve all people
1111
#
@@ -20,6 +20,81 @@ module ClassMethods
2020
def all(options={})
2121
gateway.search( { query: { match_all: {} }, size: 10_000 }.merge(options) )
2222
end
23+
24+
# Returns all models efficiently via the Elasticsearch's scan/scroll API
25+
#
26+
# You can restrict the models being returned with a query.
27+
#
28+
# The full {Persistence::Repository::Response::Result} instance is yielded to the passed
29+
# block in each batch, so you can access any of its properties. Calling `to_a` will
30+
# convert the object to an Array of model instances.
31+
#
32+
# @example Return all models in batches of 20 x number of primary shards
33+
#
34+
# Person.find_in_batches { |batch| puts batch.map(&:name) }
35+
#
36+
# @example Return all models in batches of 100 x number of primary shards
37+
#
38+
# Person.find_in_batches(size: 100) { |batch| puts batch.map(&:name) }
39+
#
40+
# @example Return all models matching a specific query
41+
#
42+
# Person.find_in_batches(query: { match: { name: 'test' } }) { |batch| puts batch.map(&:name) }
43+
#
44+
# @example Return all models, fetching only the `name` attribute from Elasticsearch
45+
#
46+
# Person.find_in_batches( _source_include: 'name') { |_| puts _.response.hits.hits.map(&:to_hash) }
47+
#
48+
# @return [String] The `scroll_id` for the request
49+
#
50+
def find_in_batches(options={}, &block)
51+
search_params = options.extract!(
52+
:index,
53+
:type,
54+
:scroll,
55+
:size,
56+
:explain,
57+
:ignore_indices,
58+
:ignore_unavailable,
59+
:allow_no_indices,
60+
:expand_wildcards,
61+
:preference,
62+
:q,
63+
:routing,
64+
:source,
65+
:_source,
66+
:_source_include,
67+
:_source_exclude,
68+
:stats,
69+
:timeout)
70+
71+
scroll = search_params.delete(:scroll) || '5m'
72+
73+
body = options
74+
75+
# Get the initial scroll_id
76+
#
77+
response = gateway.client.search( { index: gateway.index_name,
78+
type: gateway.document_type,
79+
search_type: 'scan',
80+
scroll: scroll,
81+
size: 20,
82+
body: body }.merge(search_params) )
83+
84+
# Get the initial batch of documents
85+
#
86+
response = gateway.client.scroll( { scroll_id: response['_scroll_id'], scroll: scroll } )
87+
88+
# Break when receiving an empty array of hits
89+
#
90+
while response['hits']['hits'].any? do
91+
yield Repository::Response::Results.new(gateway, response)
92+
93+
response = gateway.client.scroll( { scroll_id: response['_scroll_id'], scroll: scroll } )
94+
end
95+
96+
return response['_scroll_id']
97+
end
2398
end
2499
end
25100

elasticsearch-persistence/test/integration/model/model_basic_test.rb

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ class ::Person
2626
end
2727

2828
context "A basic persistence model" do
29+
setup do
30+
Person.gateway.create_index! force: true
31+
end
32+
2933
should "save and find the object" do
3034
person = Person.new name: 'John Smith', birthday: Date.parse('1970-01-01')
3135
person.save
@@ -94,6 +98,20 @@ class ::Person
9498

9599
assert people.map_with_hit { |o,h| h._score }.all? { |s| s > 0 }
96100
end
101+
102+
should "find instances in batches" do
103+
100.times { |i| Person.create name: "John #{i+1}" }
104+
Person.gateway.refresh_index!
105+
106+
@results = []
107+
108+
Person.find_in_batches(_source_include: 'name') do |batch|
109+
@results += batch.map(&:name)
110+
end
111+
112+
assert_equal 100, @results.size
113+
assert_contains @results, 'John 1'
114+
end
97115
end
98116

99117
end

elasticsearch-persistence/test/unit/model_find_test.rb

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def set_id(id); self.id = id; end
3434
end
3535

3636
setup do
37-
@gateway = stub
37+
@gateway = stub(client: stub(), index_name: 'foo', document_type: 'bar')
3838
DummyFindModel.stubs(:gateway).returns(@gateway)
3939

4040
@response = MultiJson.load <<-JSON
@@ -83,5 +83,35 @@ def set_id(id); self.id = id; end
8383
DummyFindModel.all( { query: { match: { title: 'test' } }, routing: 'abc123' } )
8484
end
8585

86+
should "find all records in batches" do
87+
@gateway
88+
.expects(:deserialize)
89+
.with('_source' => {'foo' => 'bar'})
90+
.returns('_source' => {'foo' => 'bar'})
91+
92+
@gateway.client
93+
.expects(:search)
94+
.with do |arguments|
95+
assert_equal 'scan', arguments[:search_type]
96+
assert_equal 'foo', arguments[:index]
97+
assert_equal 'bar', arguments[:type]
98+
end
99+
.returns(MultiJson.load('{"_scroll_id":"abc123==", "hits":{"hits":[]}}'))
100+
101+
@gateway.client
102+
.expects(:scroll)
103+
.twice
104+
.returns(MultiJson.load('{"_scroll_id":"abc456==", "hits":{"hits":[{"_source":{"foo":"bar"}}]}}'))
105+
.then
106+
.returns(MultiJson.load('{"_scroll_id":"abc789==", "hits":{"hits":[]}}'))
107+
108+
@doc = nil
109+
110+
result = DummyFindModel.find_in_batches { |batch| @doc = batch.first['_source']['foo'] }
111+
112+
assert_equal 'abc789==', result
113+
assert_equal 'bar', @doc
114+
end
115+
86116
end
87117
end

0 commit comments

Comments
 (0)