Skip to content

Commit d535d95

Browse files
committed
Refactor SearchRequest, SearchSource and Reindex
This commit refactors the above structs and aligns them with the Java source (see #955). Furthermore, it fixes go vet errors in `example_test.go`.
1 parent 4982266 commit d535d95

5 files changed

+505
-143
lines changed

example_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ func Example() {
242242
}
243243
}
244244

245-
func ExampleClient_NewClient_default() {
245+
func ExampleNewClient_default() {
246246
// Obtain a client to the Elasticsearch instance on http://127.0.0.1:9200.
247247
client, err := elastic.NewClient()
248248
if err != nil {
@@ -256,7 +256,7 @@ func ExampleClient_NewClient_default() {
256256
// connected
257257
}
258258

259-
func ExampleClient_NewClient_cluster() {
259+
func ExampleNewClient_cluster() {
260260
// Obtain a client for an Elasticsearch cluster of two nodes,
261261
// running on 10.0.1.1 and 10.0.1.2.
262262
client, err := elastic.NewClient(elastic.SetURL("http://10.0.1.1:9200", "http://10.0.1.2:9200"))
@@ -267,7 +267,7 @@ func ExampleClient_NewClient_cluster() {
267267
_ = client
268268
}
269269

270-
func ExampleClient_NewClient_manyOptions() {
270+
func ExampleNewClient_manyOptions() {
271271
// Obtain a client for an Elasticsearch cluster of two nodes,
272272
// running on 10.0.1.1 and 10.0.1.2. Do not run the sniffer.
273273
// Set the healthcheck interval to 10s. When requests fail,
@@ -287,7 +287,7 @@ func ExampleClient_NewClient_manyOptions() {
287287
_ = client
288288
}
289289

290-
func ExampleIndexExistsService() {
290+
func ExampleIndicesExistsService() {
291291
// Get a client to the local Elasticsearch instance.
292292
client, err := elastic.NewClient()
293293
if err != nil {
@@ -305,7 +305,7 @@ func ExampleIndexExistsService() {
305305
}
306306
}
307307

308-
func ExampleCreateIndexService() {
308+
func ExampleIndicesCreateService() {
309309
// Get a client to the local Elasticsearch instance.
310310
client, err := elastic.NewClient()
311311
if err != nil {
@@ -323,7 +323,7 @@ func ExampleCreateIndexService() {
323323
}
324324
}
325325

326-
func ExampleDeleteIndexService() {
326+
func ExampleIndicesDeleteService() {
327327
// Get a client to the local Elasticsearch instance.
328328
client, err := elastic.NewClient()
329329
if err != nil {
@@ -508,7 +508,7 @@ func ExampleClusterHealthService() {
508508
fmt.Printf("Cluster status is %q\n", res.Status)
509509
}
510510

511-
func ExampleClusterHealthService_WaitForGreen() {
511+
func ExampleClusterHealthService_WaitForStatus() {
512512
client, err := elastic.NewClient()
513513
if err != nil {
514514
panic(err)

reindex.go

+78-87
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package elastic
66

77
import (
88
"context"
9+
"errors"
910
"fmt"
1011
"net/url"
1112
)
@@ -214,7 +215,7 @@ func (s *ReindexService) Validate() error {
214215
if s.source == nil {
215216
invalid = append(invalid, "Source")
216217
} else {
217-
if len(s.source.indices) == 0 {
218+
if len(s.source.request.indices) == 0 {
218219
invalid = append(invalid, "Source.Index")
219220
}
220221
}
@@ -353,85 +354,124 @@ func (s *ReindexService) DoAsync(ctx context.Context) (*StartTaskResult, error)
353354

354355
// ReindexSource specifies the source of a Reindex process.
355356
type ReindexSource struct {
356-
searchType string // default in ES is "query_then_fetch"
357-
indices []string
358-
types []string
359-
routing *string
360-
preference *string
361-
requestCache *bool
362-
scroll string
363-
query Query
364-
sorts []SortInfo
365-
sorters []Sorter
366-
searchSource *SearchSource
367-
remoteInfo *ReindexRemoteInfo
357+
searchType string // default in ES is "query_then_fetch"
358+
request *SearchRequest
359+
/*
360+
indices []string
361+
types []string
362+
routing *string
363+
preference *string
364+
requestCache *bool
365+
scroll string
366+
query Query
367+
sorts []SortInfo
368+
sorters []Sorter
369+
searchSource *SearchSource
370+
*/
371+
remoteInfo *ReindexRemoteInfo
368372
}
369373

370374
// NewReindexSource creates a new ReindexSource.
371375
func NewReindexSource() *ReindexSource {
372-
return &ReindexSource{}
376+
return &ReindexSource{
377+
request: NewSearchRequest(),
378+
}
379+
}
380+
381+
// Request specifies the search request used for source.
382+
func (r *ReindexSource) Request(request *SearchRequest) *ReindexSource {
383+
if request == nil {
384+
r.request = NewSearchRequest()
385+
} else {
386+
r.request = request
387+
}
388+
return r
373389
}
374390

375391
// SearchType is the search operation type. Possible values are
376392
// "query_then_fetch" and "dfs_query_then_fetch".
377393
func (r *ReindexSource) SearchType(searchType string) *ReindexSource {
378-
r.searchType = searchType
394+
r.request = r.request.SearchType(searchType)
379395
return r
380396
}
381397

382398
func (r *ReindexSource) SearchTypeDfsQueryThenFetch() *ReindexSource {
383-
return r.SearchType("dfs_query_then_fetch")
399+
r.request = r.request.SearchType("dfs_query_then_fetch")
400+
return r
384401
}
385402

386403
func (r *ReindexSource) SearchTypeQueryThenFetch() *ReindexSource {
387-
return r.SearchType("query_then_fetch")
404+
r.request = r.request.SearchType("query_then_fetch")
405+
return r
388406
}
389407

390408
func (r *ReindexSource) Index(indices ...string) *ReindexSource {
391-
r.indices = append(r.indices, indices...)
409+
r.request = r.request.Index(indices...)
392410
return r
393411
}
394412

395413
func (r *ReindexSource) Type(types ...string) *ReindexSource {
396-
r.types = append(r.types, types...)
414+
r.request = r.request.Type(types...)
397415
return r
398416
}
399417

400418
func (r *ReindexSource) Preference(preference string) *ReindexSource {
401-
r.preference = &preference
419+
r.request = r.request.Preference(preference)
402420
return r
403421
}
404422

405423
func (r *ReindexSource) RequestCache(requestCache bool) *ReindexSource {
406-
r.requestCache = &requestCache
424+
r.request = r.request.RequestCache(requestCache)
407425
return r
408426
}
409427

410428
func (r *ReindexSource) Scroll(scroll string) *ReindexSource {
411-
r.scroll = scroll
429+
r.request = r.request.Scroll(scroll)
412430
return r
413431
}
414432

415433
func (r *ReindexSource) Query(query Query) *ReindexSource {
416-
r.query = query
434+
r.request = r.request.Query(query)
417435
return r
418436
}
419437

420438
// Sort adds a sort order.
421439
func (r *ReindexSource) Sort(field string, ascending bool) *ReindexSource {
422-
r.sorts = append(r.sorts, SortInfo{Field: field, Ascending: ascending})
440+
r.request = r.request.Sort(field, ascending)
423441
return r
424442
}
425443

426444
// SortWithInfo adds a sort order.
427445
func (r *ReindexSource) SortWithInfo(info SortInfo) *ReindexSource {
428-
r.sorts = append(r.sorts, info)
446+
r.request = r.request.SortWithInfo(info)
429447
return r
430448
}
431449

432450
// SortBy adds a sort order.
433451
func (r *ReindexSource) SortBy(sorter ...Sorter) *ReindexSource {
434-
r.sorters = append(r.sorters, sorter...)
452+
r.request = r.request.SortBy(sorter...)
453+
return r
454+
}
455+
456+
// FetchSource indicates whether the response should contain the stored
457+
// _source for every hit.
458+
func (r *ReindexSource) FetchSource(fetchSource bool) *ReindexSource {
459+
r.request = r.request.FetchSource(fetchSource)
460+
return r
461+
}
462+
463+
// FetchSourceIncludeExclude specifies that _source should be returned
464+
// with each hit, where "include" and "exclude" serve as a simple wildcard
465+
// matcher that gets applied to its fields
466+
// (e.g. include := []string{"obj1.*","obj2.*"}, exclude := []string{"description.*"}).
467+
func (r *ReindexSource) FetchSourceIncludeExclude(include, exclude []string) *ReindexSource {
468+
r.request = r.request.FetchSourceIncludeExclude(include, exclude)
469+
return r
470+
}
471+
472+
// FetchSourceContext indicates how the _source should be fetched.
473+
func (r *ReindexSource) FetchSourceContext(fsc *FetchSourceContext) *ReindexSource {
474+
r.request = r.request.FetchSourceContext(fsc)
435475
return r
436476
}
437477

@@ -443,84 +483,35 @@ func (r *ReindexSource) RemoteInfo(ri *ReindexRemoteInfo) *ReindexSource {
443483

444484
// Source returns a serializable JSON request for the request.
445485
func (r *ReindexSource) Source() (interface{}, error) {
446-
source := make(map[string]interface{})
447-
448-
if r.query != nil {
449-
src, err := r.query.Source()
450-
if err != nil {
451-
return nil, err
452-
}
453-
source["query"] = src
454-
} else if r.searchSource != nil {
455-
src, err := r.searchSource.Source()
456-
if err != nil {
457-
return nil, err
458-
}
459-
source["source"] = src
486+
src, err := r.request.sourceAsMap()
487+
if err != nil {
488+
return nil, err
460489
}
461-
462-
if r.searchType != "" {
463-
source["search_type"] = r.searchType
490+
source, ok := src.(map[string]interface{})
491+
if !ok {
492+
return nil, errors.New("unable to use SearchRequest as map[string]interface{}")
464493
}
465494

466-
switch len(r.indices) {
467-
case 0:
495+
switch len(r.request.indices) {
468496
case 1:
469-
source["index"] = r.indices[0]
497+
source["index"] = r.request.indices[0]
470498
default:
471-
source["index"] = r.indices
499+
source["index"] = r.request.indices
472500
}
473-
474-
switch len(r.types) {
501+
switch len(r.request.types) {
475502
case 0:
476503
case 1:
477-
source["type"] = r.types[0]
504+
source["type"] = r.request.types[0]
478505
default:
479-
source["type"] = r.types
480-
}
481-
482-
if r.preference != nil && *r.preference != "" {
483-
source["preference"] = *r.preference
506+
source["type"] = r.request.types
484507
}
485-
486-
if r.requestCache != nil {
487-
source["request_cache"] = fmt.Sprintf("%v", *r.requestCache)
488-
}
489-
490-
if r.scroll != "" {
491-
source["scroll"] = r.scroll
492-
}
493-
494508
if r.remoteInfo != nil {
495509
src, err := r.remoteInfo.Source()
496510
if err != nil {
497511
return nil, err
498512
}
499513
source["remote"] = src
500514
}
501-
502-
if len(r.sorters) > 0 {
503-
var sortarr []interface{}
504-
for _, sorter := range r.sorters {
505-
src, err := sorter.Source()
506-
if err != nil {
507-
return nil, err
508-
}
509-
sortarr = append(sortarr, src)
510-
}
511-
source["sort"] = sortarr
512-
} else if len(r.sorts) > 0 {
513-
var sortarr []interface{}
514-
for _, sort := range r.sorts {
515-
src, err := sort.Source()
516-
if err != nil {
517-
return nil, err
518-
}
519-
sortarr = append(sortarr, src)
520-
}
521-
source["sort"] = sortarr
522-
}
523-
524515
return source, nil
525516
}
526517

reindex_test.go

+18-5
Original file line numberDiff line numberDiff line change
@@ -260,15 +260,28 @@ func TestReindexSourceWithRouting(t *testing.T) {
260260
}
261261
}
262262

263-
func TestReindex(t *testing.T) {
264-
client := setupTestClientAndCreateIndexAndAddDocs(t) // , SetTraceLog(log.New(os.Stdout, "", 0)))
265-
esversion, err := client.ElasticsearchVersion(DefaultURL)
263+
func TestReindexSourceWithSourceFilter(t *testing.T) {
264+
client := setupTestClient(t)
265+
src := NewReindexSource().Index("twitter").
266+
FetchSourceIncludeExclude([]string{"obj1.*", "obj2.*"}, []string{"*.description"})
267+
dst := NewReindexDestination().Index("new_twitter")
268+
out, err := client.Reindex().Source(src).Destination(dst).getBody()
266269
if err != nil {
267270
t.Fatal(err)
268271
}
269-
if esversion < "2.3.0" {
270-
t.Skipf("Elasticsearch %v does not support Reindex API yet", esversion)
272+
b, err := json.Marshal(out)
273+
if err != nil {
274+
t.Fatal(err)
275+
}
276+
got := string(b)
277+
want := `{"dest":{"index":"new_twitter"},"source":{"_source":{"excludes":["*.description"],"includes":["obj1.*","obj2.*"]},"index":"twitter"}}`
278+
if got != want {
279+
t.Fatalf("\ngot %s\nwant %s", got, want)
271280
}
281+
}
282+
283+
func TestReindex(t *testing.T) {
284+
client := setupTestClientAndCreateIndexAndAddDocs(t) // , SetTraceLog(log.New(os.Stdout, "", 0)))
272285

273286
sourceCount, err := client.Count(testIndexName).Do(context.TODO())
274287
if err != nil {

0 commit comments

Comments
 (0)