Skip to content

Commit 9883bdc

Browse files
committed
Fix HTTP headers in the Tasks API
This commit fixes an issue with the Tasks API. When passing a HTTP header in the request, it couldn't be deserialized in the HTTP response. Furthermore, we now add a feature to retrieve the HTTP header from the HTTP response header, not from its body as well. This is described here: https://www.elastic.co/guide/en/elasticsearch/reference/6.4/tasks.html#_identifying_running_tasks. Close #906
1 parent a772572 commit 9883bdc

5 files changed

+94
-29
lines changed

setup_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ const (
1717
testIndexName = "elastic-test"
1818
testIndexName2 = "elastic-test2"
1919
testIndexName3 = "elastic-test3"
20+
testIndexName4 = "elastic-test4"
2021
testMapping = `
2122
{
2223
"settings":{

tasks_get_task.go

+19-5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package elastic
33
import (
44
"context"
55
"fmt"
6+
"net/http"
67
"net/url"
78

89
"github.com/olivere/elastic/uritemplates"
@@ -17,6 +18,7 @@ type TasksGetTaskService struct {
1718
pretty bool
1819
taskId string
1920
waitForCompletion *bool
21+
headers http.Header
2022
}
2123

2224
// NewTasksGetTaskService creates a new TasksGetTaskService.
@@ -47,6 +49,15 @@ func (s *TasksGetTaskService) WaitForCompletion(waitForCompletion bool) *TasksGe
4749
return s
4850
}
4951

52+
// Header sets headers on the request
53+
func (s *TasksGetTaskService) Header(name string, value string) *TasksGetTaskService {
54+
if s.headers == nil {
55+
s.headers = http.Header{}
56+
}
57+
s.headers.Add(name, value)
58+
return s
59+
}
60+
5061
// Pretty indicates that the JSON response be indented and human readable.
5162
func (s *TasksGetTaskService) Pretty(pretty bool) *TasksGetTaskService {
5263
s.pretty = pretty
@@ -94,9 +105,10 @@ func (s *TasksGetTaskService) Do(ctx context.Context) (*TasksGetTaskResponse, er
94105

95106
// Get HTTP response
96107
res, err := s.client.PerformRequest(ctx, PerformRequestOptions{
97-
Method: "GET",
98-
Path: path,
99-
Params: params,
108+
Method: "GET",
109+
Path: path,
110+
Params: params,
111+
Headers: s.headers,
100112
})
101113
if err != nil {
102114
return nil, err
@@ -107,10 +119,12 @@ func (s *TasksGetTaskService) Do(ctx context.Context) (*TasksGetTaskResponse, er
107119
if err := s.client.decoder.Decode(res.Body, ret); err != nil {
108120
return nil, err
109121
}
122+
ret.Header = res.Header
110123
return ret, nil
111124
}
112125

113126
type TasksGetTaskResponse struct {
114-
Completed bool `json:"completed"`
115-
Task *TaskInfo `json:"task,omitempty"`
127+
Header http.Header `json:"-"`
128+
Completed bool `json:"completed"`
129+
Task *TaskInfo `json:"task,omitempty"`
116130
}

tasks_get_task_test.go

+37-10
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package elastic
66

77
import (
8+
"context"
89
"testing"
910
)
1011

@@ -32,22 +33,48 @@ func TestTasksGetTaskBuildURL(t *testing.T) {
3233
}
3334
}
3435

35-
/*
3636
func TestTasksGetTask(t *testing.T) {
37-
client := setupTestClientAndCreateIndexAndAddDocs(t)
38-
esversion, err := client.ElasticsearchVersion(DefaultURL)
39-
if err != nil {
40-
t.Fatal(err)
41-
}
42-
if esversion < "2.3.0" {
43-
t.Skipf("Elasticsearch %v does not support Tasks Management API yet", esversion)
37+
client := setupTestClientAndCreateIndexAndAddDocs(t) //, SetTraceLog(log.New(os.Stdout, "", 0)))
38+
39+
// Create a reindexing task
40+
var taskID string
41+
{
42+
res, err := client.Reindex().
43+
SourceIndex(testIndexName).
44+
DestinationIndex(testIndexName4).
45+
DoAsync(context.Background())
46+
if err != nil {
47+
t.Fatalf("unable to start reindexing task: %v", err)
48+
}
49+
taskID = res.TaskId
4450
}
45-
res, err := client.TasksGetTask().TaskId("123").Do(context.TODO())
51+
52+
// Get the task by ID
53+
res, err := client.TasksGetTask().
54+
TaskId(taskID).
55+
Header("X-Opaque-Id", "987654").
56+
Do(context.Background())
4657
if err != nil {
4758
t.Fatal(err)
4859
}
4960
if res == nil {
5061
t.Fatal("response is nil")
5162
}
63+
if want, have := "987654", res.Header.Get("X-Opaque-Id"); want != have {
64+
t.Fatalf("expected HTTP header %#v; got: %#v", want, have)
65+
}
66+
if res.Task == nil {
67+
t.Fatal("task is nil")
68+
}
69+
// Elasticsearch <= 6.4.1 doesn't return the X-Opaque-Id in the body,
70+
// only in response header.
71+
/*
72+
have, found := res.Task.Headers["X-Opaque-Id"]
73+
if !found {
74+
t.Fatalf("expected to find headers[%q]", "X-Opaque-Id")
75+
}
76+
if want := "987654"; want != have {
77+
t.Fatalf("expected headers[%q]=%q; got: %q", "X-Opaque-Id", want, have)
78+
}
79+
*/
5280
}
53-
*/

tasks_list.go

+15-13
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,13 @@ func (s *TasksListService) Do(ctx context.Context) (*TasksListResponse, error) {
190190
if err := s.client.decoder.Decode(res.Body, ret); err != nil {
191191
return nil, err
192192
}
193+
ret.Header = res.Header
193194
return ret, nil
194195
}
195196

196197
// TasksListResponse is the response of TasksListService.Do.
197198
type TasksListResponse struct {
199+
Header http.Header `json:"-"`
198200
TaskFailures []*TaskOperationFailure `json:"task_failures"`
199201
NodeFailures []*FailedNodeException `json:"node_failures"`
200202
// Nodes returns the tasks per node. The key is the node id.
@@ -226,19 +228,19 @@ type DiscoveryNode struct {
226228

227229
// TaskInfo represents information about a currently running task.
228230
type TaskInfo struct {
229-
Node string `json:"node"`
230-
Id int64 `json:"id"` // the task id (yes, this is a long in the Java source)
231-
Type string `json:"type"`
232-
Action string `json:"action"`
233-
Status interface{} `json:"status"` // has separate implementations of Task.Status in Java for reindexing, replication, and "RawTaskStatus"
234-
Description interface{} `json:"description"` // same as Status
235-
StartTime string `json:"start_time"`
236-
StartTimeInMillis int64 `json:"start_time_in_millis"`
237-
RunningTime string `json:"running_time"`
238-
RunningTimeInNanos int64 `json:"running_time_in_nanos"`
239-
Cancellable bool `json:"cancellable"`
240-
ParentTaskId string `json:"parent_task_id"` // like "YxJnVYjwSBm_AUbzddTajQ:12356"
241-
Headers http.Header `json:"headers"`
231+
Node string `json:"node"`
232+
Id int64 `json:"id"` // the task id (yes, this is a long in the Java source)
233+
Type string `json:"type"`
234+
Action string `json:"action"`
235+
Status interface{} `json:"status"` // has separate implementations of Task.Status in Java for reindexing, replication, and "RawTaskStatus"
236+
Description interface{} `json:"description"` // same as Status
237+
StartTime string `json:"start_time"`
238+
StartTimeInMillis int64 `json:"start_time_in_millis"`
239+
RunningTime string `json:"running_time"`
240+
RunningTimeInNanos int64 `json:"running_time_in_nanos"`
241+
Cancellable bool `json:"cancellable"`
242+
ParentTaskId string `json:"parent_task_id"` // like "YxJnVYjwSBm_AUbzddTajQ:12356"
243+
Headers map[string]string `json:"headers"`
242244
}
243245

244246
// StartTaskResult is used in cases where a task gets started asynchronously and

tasks_list_test.go

+22-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,11 @@ func TestTasksListBuildURL(t *testing.T) {
4646

4747
func TestTasksList(t *testing.T) {
4848
client := setupTestClientAndCreateIndexAndAddDocs(t) //, SetTraceLog(log.New(os.Stdout, "", 0)))
49-
res, err := client.TasksList().Pretty(true).Human(true).Do(context.TODO())
49+
res, err := client.TasksList().
50+
Pretty(true).
51+
Human(true).
52+
Header("X-Opaque-Id", "123456").
53+
Do(context.TODO())
5054
if err != nil {
5155
t.Fatal(err)
5256
}
@@ -56,4 +60,21 @@ func TestTasksList(t *testing.T) {
5660
if len(res.Nodes) == 0 {
5761
t.Fatalf("expected at least 1 node; got: %d", len(res.Nodes))
5862
}
63+
if want, have := "123456", res.Header.Get("X-Opaque-Id"); want != have {
64+
t.Fatalf("expected HTTP header %#v; got: %#v", want, have)
65+
}
66+
for _, node := range res.Nodes {
67+
if len(node.Tasks) == 0 {
68+
t.Fatalf("expected at least 1 task; got: %d", len(node.Tasks))
69+
}
70+
for _, task := range node.Tasks {
71+
have, found := task.Headers["X-Opaque-Id"]
72+
if !found {
73+
t.Fatalf("expected to find headers[%q]", "X-Opaque-Id")
74+
}
75+
if want := "123456"; want != have {
76+
t.Fatalf("expected headers[%q]=%q; got: %q", "X-Opaque-Id", want, have)
77+
}
78+
}
79+
}
5980
}

0 commit comments

Comments
 (0)