File tree 6 files changed +110
-1
lines changed
elasticsearch-model/lib/elasticsearch
6 files changed +110
-1
lines changed Original file line number Diff line number Diff line change 13
13
require 'elasticsearch/model/adapters/active_record'
14
14
require 'elasticsearch/model/adapters/mongoid'
15
15
16
+ require 'elasticsearch/model/importing'
16
17
require 'elasticsearch/model/indexing'
17
18
require 'elasticsearch/model/naming'
18
19
require 'elasticsearch/model/serializing'
@@ -80,10 +81,18 @@ def self.included(base)
80
81
include Elasticsearch ::Model ::Serializing ::InstanceMethods
81
82
end
82
83
84
+ # Delegate important methods to the `__elasticsearch__` proxy, unless they are defined already
85
+ #
83
86
extend Support ::Forwardable
84
87
forward :'self.__elasticsearch__' , :search unless respond_to? ( :search )
85
88
forward :'self.__elasticsearch__' , :mapping unless respond_to? ( :mapping )
86
89
forward :'self.__elasticsearch__' , :settings unless respond_to? ( :settings )
90
+ forward :'self.__elasticsearch__' , :import unless respond_to? ( :import )
91
+
92
+ # Mix the importing module into the proxy
93
+ #
94
+ self . __elasticsearch__ . class . __send__ :include , Elasticsearch ::Model ::Importing ::ClassMethods
95
+ self . __elasticsearch__ . class . __send__ :include , Adapter . from_class ( base ) . importing_mixin
87
96
end
88
97
end
89
98
Original file line number Diff line number Diff line change @@ -28,6 +28,10 @@ def callbacks_mixin
28
28
adapter . const_get ( :Callbacks )
29
29
end
30
30
31
+ def importing_mixin
32
+ adapter . const_get ( :Importing )
33
+ end
34
+
31
35
def adapter
32
36
@adapter ||= begin
33
37
self . class . adapters . find ( lambda { [ ] } ) { |name , condition | condition . call ( klass ) } . first \
Original file line number Diff line number Diff line change @@ -57,6 +57,19 @@ def self.included(base)
57
57
end
58
58
end
59
59
60
+ module Importing
61
+ # Fetch batches of records from the database
62
+ #
63
+ # Use the [`find_in_batches`](http://api.rubyonrails.org/classes/ActiveRecord/Batches.html) method
64
+ #
65
+ def __find_in_batches ( options = { } , &block )
66
+ find_in_batches ( options ) do |batch |
67
+ batch_for_bulk = batch . as_json . map { |a | { index : { _id : a . delete ( 'id' ) , data : a } } }
68
+ yield batch_for_bulk
69
+ end
70
+ end
71
+ end
72
+
60
73
end
61
74
62
75
end
Original file line number Diff line number Diff line change @@ -4,7 +4,6 @@ module Adapter
4
4
module Default
5
5
6
6
module Records
7
-
8
7
# Use `ActiveModel#find`
9
8
#
10
9
def records
@@ -16,6 +15,12 @@ module Callbacks
16
15
# noop
17
16
end
18
17
18
+ module Importing
19
+ def __find_in_batches ( options = { } , &block )
20
+ raise NoMethodError , "Method not implemented for default adapter"
21
+ end
22
+ end
23
+
19
24
end
20
25
end
21
26
end
Original file line number Diff line number Diff line change @@ -43,6 +43,33 @@ def self.included(base)
43
43
end
44
44
end
45
45
46
+ module Importing
47
+ # Fetch batches of records from the database
48
+ #
49
+ # @see https://github.com/mongoid/mongoid/issues/1334
50
+ # @see https://github.com/karmi/retire/pull/724
51
+ #
52
+ def __find_in_batches ( options = { } , &block )
53
+ options [ :batch_size ] ||= 1_000
54
+ items = [ ]
55
+
56
+ all . each do |item |
57
+ items << item
58
+
59
+ if items . length % options [ :batch_size ] == 0
60
+ batch_for_bulk = items . map { |a | { index : { _id : a . id , data : a . attributes } } }
61
+ yield batch_for_bulk
62
+ items = [ ]
63
+ end
64
+ end
65
+
66
+ unless items . empty?
67
+ batch_for_bulk = items . map { |a | { index : { _id : a . id , data : a . attributes } } }
68
+ yield batch_for_bulk
69
+ end
70
+ end
71
+ end
72
+
46
73
end
47
74
48
75
end
Original file line number Diff line number Diff line change
1
+ module Elasticsearch
2
+ module Model
3
+
4
+ # This module provides the support for easily and efficiently importing
5
+ # all the records from the including class into the index.
6
+ #
7
+ module Importing
8
+
9
+ module ClassMethods
10
+
11
+ def self . included ( base )
12
+ adapter = Adapter . from_class ( base )
13
+ base . __send__ :include , adapter . importing_mixin
14
+ end
15
+
16
+ # Import all model records into the index
17
+ #
18
+ # The method will pick up correct strategy based on the `Importing` module
19
+ # defined in the corresponding adapter.
20
+ #
21
+ # @example Import all records into the index
22
+ #
23
+ # Article.import
24
+ #
25
+ # @example Set the batch size to 100
26
+ #
27
+ # Article.import(batch_size: 100)
28
+ #
29
+ # @example Process the response from Elasticsearch
30
+ #
31
+ # Article.import do |response|
32
+ # puts "Got " + response['items'].select { |i| i['index']['error'] }.size.to_s + " errors"
33
+ # end
34
+ #
35
+ def import ( options = { } , &block )
36
+ __find_in_batches ( options ) do |batch |
37
+ response = client . bulk \
38
+ index : index_name ,
39
+ type : document_type ,
40
+ body : batch ,
41
+ refresh : options [ :refresh ]
42
+ yield response if block_given?
43
+ end
44
+ end
45
+
46
+ end
47
+
48
+ end
49
+
50
+ end
51
+ end
You can’t perform that action at this time.
0 commit comments