Skip to content

Commit 352af50

Browse files
committed
reindex: Add ability to add HTTP headers to request
Close #1031
1 parent 2243838 commit 352af50

File tree

4 files changed

+50
-27
lines changed

4 files changed

+50
-27
lines changed

delete_by_query.go

+12-10
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package elastic
77
import (
88
"context"
99
"fmt"
10+
"net/http"
1011
"net/url"
1112
"strings"
1213

@@ -691,16 +692,17 @@ func (s *DeleteByQueryService) DoAsync(ctx context.Context) (*StartTaskResult, e
691692
// BulkIndexByScrollResponse is the outcome of executing Do with
692693
// DeleteByQueryService and UpdateByQueryService.
693694
type BulkIndexByScrollResponse struct {
694-
Took int64 `json:"took"`
695-
SliceId *int64 `json:"slice_id,omitempty"`
696-
TimedOut bool `json:"timed_out"`
697-
Total int64 `json:"total"`
698-
Updated int64 `json:"updated,omitempty"`
699-
Created int64 `json:"created,omitempty"`
700-
Deleted int64 `json:"deleted"`
701-
Batches int64 `json:"batches"`
702-
VersionConflicts int64 `json:"version_conflicts"`
703-
Noops int64 `json:"noops"`
695+
Header http.Header `json:"-"`
696+
Took int64 `json:"took"`
697+
SliceId *int64 `json:"slice_id,omitempty"`
698+
TimedOut bool `json:"timed_out"`
699+
Total int64 `json:"total"`
700+
Updated int64 `json:"updated,omitempty"`
701+
Created int64 `json:"created,omitempty"`
702+
Deleted int64 `json:"deleted"`
703+
Batches int64 `json:"batches"`
704+
VersionConflicts int64 `json:"version_conflicts"`
705+
Noops int64 `json:"noops"`
704706
Retries struct {
705707
Bulk int64 `json:"bulk"`
706708
Search int64 `json:"search"`

reindex.go

+23-8
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"context"
99
"errors"
1010
"fmt"
11+
"net/http"
1112
"net/url"
1213
)
1314

@@ -28,6 +29,7 @@ type ReindexService struct {
2829
conflicts string
2930
size *int
3031
script *Script
32+
headers http.Header
3133
}
3234

3335
// NewReindexService creates a new ReindexService.
@@ -87,6 +89,15 @@ func (s *ReindexService) WaitForCompletion(waitForCompletion bool) *ReindexServi
8789
return s
8890
}
8991

92+
// Header sets headers on the request
93+
func (s *ReindexService) Header(name string, value string) *ReindexService {
94+
if s.headers == nil {
95+
s.headers = http.Header{}
96+
}
97+
s.headers.Add(name, value)
98+
return s
99+
}
100+
90101
// Pretty indicates that the JSON response be indented and human readable.
91102
func (s *ReindexService) Pretty(pretty bool) *ReindexService {
92103
s.pretty = pretty
@@ -286,10 +297,11 @@ func (s *ReindexService) Do(ctx context.Context) (*BulkIndexByScrollResponse, er
286297

287298
// Get HTTP response
288299
res, err := s.client.PerformRequest(ctx, PerformRequestOptions{
289-
Method: "POST",
290-
Path: path,
291-
Params: params,
292-
Body: body,
300+
Method: "POST",
301+
Path: path,
302+
Params: params,
303+
Body: body,
304+
Headers: s.headers,
293305
})
294306
if err != nil {
295307
return nil, err
@@ -300,6 +312,7 @@ func (s *ReindexService) Do(ctx context.Context) (*BulkIndexByScrollResponse, er
300312
if err := s.client.decoder.Decode(res.Body, ret); err != nil {
301313
return nil, err
302314
}
315+
ret.Header = res.Header
303316
return ret, nil
304317
}
305318

@@ -333,10 +346,11 @@ func (s *ReindexService) DoAsync(ctx context.Context) (*StartTaskResult, error)
333346

334347
// Get HTTP response
335348
res, err := s.client.PerformRequest(ctx, PerformRequestOptions{
336-
Method: "POST",
337-
Path: path,
338-
Params: params,
339-
Body: body,
349+
Method: "POST",
350+
Path: path,
351+
Params: params,
352+
Body: body,
353+
Headers: s.headers,
340354
})
341355
if err != nil {
342356
return nil, err
@@ -347,6 +361,7 @@ func (s *ReindexService) DoAsync(ctx context.Context) (*StartTaskResult, error)
347361
if err := s.client.decoder.Decode(res.Body, ret); err != nil {
348362
return nil, err
349363
}
364+
ret.Header = res.Header
350365
return ret, nil
351366
}
352367

reindex_test.go

+13-8
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,12 @@ func TestReindex(t *testing.T) {
302302
// Simple copying
303303
src := NewReindexSource().Index(testIndexName)
304304
dst := NewReindexDestination().Index(testIndexName2)
305-
res, err := client.Reindex().Source(src).Destination(dst).Refresh("true").Do(context.TODO())
305+
res, err := client.Reindex().
306+
Source(src).
307+
Destination(dst).
308+
Refresh("true").
309+
Header("X-Opaque-Id", "987654").
310+
Do(context.TODO())
306311
if err != nil {
307312
t.Fatal(err)
308313
}
@@ -318,6 +323,9 @@ func TestReindex(t *testing.T) {
318323
if res.Created != sourceCount {
319324
t.Errorf("expected %d, got %d", sourceCount, res.Created)
320325
}
326+
if want, have := "987654", res.Header.Get("X-Opaque-Id"); want != have {
327+
t.Fatalf("expected HTTP header %#v; got: %#v", want, have)
328+
}
321329

322330
targetCount, err = client.Count(testIndexName2).Do(context.TODO())
323331
if err != nil {
@@ -330,13 +338,6 @@ func TestReindex(t *testing.T) {
330338

331339
func TestReindexAsync(t *testing.T) {
332340
client := setupTestClientAndCreateIndexAndAddDocs(t) //, SetTraceLog(log.New(os.Stdout, "", 0)))
333-
esversion, err := client.ElasticsearchVersion(DefaultURL)
334-
if err != nil {
335-
t.Fatal(err)
336-
}
337-
if esversion < "2.3.0" {
338-
t.Skipf("Elasticsearch %v does not support Reindex API yet", esversion)
339-
}
340341

341342
sourceCount, err := client.Count(testIndexName).Do(context.TODO())
342343
if err != nil {
@@ -361,6 +362,7 @@ func TestReindexAsync(t *testing.T) {
361362
Source(src).
362363
Destination(dst).
363364
Slices("auto").
365+
Header("X-Opaque-Id", "987654").
364366
DoAsync(context.TODO())
365367
if err != nil {
366368
t.Fatal(err)
@@ -371,6 +373,9 @@ func TestReindexAsync(t *testing.T) {
371373
if res.TaskId == "" {
372374
t.Errorf("expected a task id, got %+v", res)
373375
}
376+
if want, have := "987654", res.Header.Get("X-Opaque-Id"); want != have {
377+
t.Fatalf("expected HTTP header %#v; got: %#v", want, have)
378+
}
374379

375380
tasksGetTask := client.TasksGetTask()
376381
taskStatus, err := tasksGetTask.TaskId(res.TaskId).Do(context.TODO())

tasks_list.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -246,5 +246,6 @@ type TaskInfo struct {
246246
// StartTaskResult is used in cases where a task gets started asynchronously and
247247
// the operation simply returnes a TaskID to watch for via the Task Management API.
248248
type StartTaskResult struct {
249-
TaskId string `json:"task"`
249+
Header http.Header `json:"-"`
250+
TaskId string `json:"task"`
250251
}

0 commit comments

Comments
 (0)