Skip to content

Commit 37d070b

Browse files
committed
Util: Add the BulkIndexer helper
This patch adds a bulk indexing helper compoment into the "esutil" package. See code annotations and the bulk_indexer_example_test.go file for information on usage. Closes #137 (cherry picked from commit 56b3816)
1 parent 3925e83 commit 37d070b

9 files changed

+1548
-0
lines changed

esutil/bulk_indexer.go

+544
Large diffs are not rendered by default.

esutil/bulk_indexer_benchmark_test.go

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Licensed to Elasticsearch B.V. under one or more agreements.
2+
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information.
4+
5+
// +build !integration
6+
7+
package esutil_test
8+
9+
import (
10+
"bytes"
11+
"context"
12+
"io/ioutil"
13+
"net/http"
14+
"strconv"
15+
"strings"
16+
"testing"
17+
18+
"github.com/elastic/go-elasticsearch/v7"
19+
"github.com/elastic/go-elasticsearch/v7/esutil"
20+
)
21+
22+
var mockResponseBody = `{
23+
"took": 30,
24+
"errors": false,
25+
"items": [
26+
{
27+
"index": {
28+
"_index": "test",
29+
"_id": "1",
30+
"_version": 1,
31+
"result": "created",
32+
"_shards": { "total": 2, "successful": 1, "failed": 0 },
33+
"status": 201,
34+
"_seq_no": 0,
35+
"_primary_term": 1
36+
}
37+
}
38+
]
39+
}`
40+
41+
type mockTransp struct{}
42+
43+
func (t *mockTransp) RoundTrip(req *http.Request) (*http.Response, error) {
44+
return &http.Response{Body: ioutil.NopCloser(strings.NewReader(mockResponseBody))}, nil // 1x alloc
45+
}
46+
47+
func BenchmarkBulkIndexer(b *testing.B) {
48+
b.ReportAllocs()
49+
50+
b.Run("Basic", func(b *testing.B) {
51+
b.ResetTimer()
52+
53+
es, _ := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransp{}})
54+
bi, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
55+
Client: es,
56+
FlushBytes: 1024,
57+
})
58+
defer bi.Close(context.Background())
59+
60+
docID := make([]byte, 0, 16)
61+
var docIDBuf bytes.Buffer
62+
docIDBuf.Grow(cap(docID))
63+
64+
for i := 0; i < b.N; i++ {
65+
docID = strconv.AppendInt(docID, int64(i), 10)
66+
docIDBuf.Write(docID)
67+
bi.Add(context.Background(), esutil.BulkIndexerItem{
68+
Action: "index",
69+
DocumentID: docIDBuf.String(), // 1x alloc
70+
Body: strings.NewReader(`{"foo":"bar"}`), // 1x alloc
71+
})
72+
docID = docID[:0]
73+
docIDBuf.Reset()
74+
}
75+
})
76+
}

esutil/bulk_indexer_example_test.go

+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Licensed to Elasticsearch B.V. under one or more agreements.
2+
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information.
4+
5+
// +build !integration
6+
7+
package esutil_test
8+
9+
import (
10+
"context"
11+
"fmt"
12+
"log"
13+
"strings"
14+
"time"
15+
16+
"github.com/elastic/go-elasticsearch/v7"
17+
"github.com/elastic/go-elasticsearch/v7/esutil"
18+
)
19+
20+
func ExampleNewBulkIndexer() {
21+
log.SetFlags(0)
22+
23+
// Create the Elasticsearch client
24+
//
25+
es, err := elasticsearch.NewClient(elasticsearch.Config{
26+
// Retry on 429 TooManyRequests statuses
27+
//
28+
RetryOnStatus: []int{502, 503, 504, 429},
29+
30+
// A simple incremental backoff function
31+
//
32+
RetryBackoff: func(i int) time.Duration { return time.Duration(i) * 100 * time.Millisecond },
33+
34+
// Retry up to 5 attempts
35+
//
36+
MaxRetries: 5,
37+
})
38+
if err != nil {
39+
log.Fatalf("Error creating the client: %s", err)
40+
}
41+
42+
// Create the indexer
43+
//
44+
indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
45+
Client: es, // The Elasticsearch client
46+
Index: "test", // The default index name
47+
NumWorkers: 4, // The number of worker goroutines (default: number of CPUs)
48+
FlushBytes: 5e+6, // The flush threshold in bytes (default: 5M)
49+
})
50+
if err != nil {
51+
log.Fatalf("Error creating the indexer: %s", err)
52+
}
53+
54+
// Add an item to the indexer
55+
//
56+
err = indexer.Add(
57+
context.Background(),
58+
esutil.BulkIndexerItem{
59+
// Action field configures the operation to perform (index, create, delete, update)
60+
Action: "index",
61+
62+
// DocumentID is the optional document ID
63+
DocumentID: "1",
64+
65+
// Body is an `io.Reader` with the payload
66+
Body: strings.NewReader(`{"title":"Test"}`),
67+
68+
// OnSuccess is the optional callback for each successful operation
69+
OnSuccess: func(
70+
ctx context.Context,
71+
item esutil.BulkIndexerItem,
72+
res esutil.BulkIndexerResponseItem,
73+
) {
74+
fmt.Printf("[%d] %s test/%s", res.Status, res.Result, item.DocumentID)
75+
},
76+
77+
// OnFailure is the optional callback for each failed operation
78+
OnFailure: func(
79+
ctx context.Context,
80+
item esutil.BulkIndexerItem,
81+
res esutil.BulkIndexerResponseItem, err error,
82+
) {
83+
if err != nil {
84+
log.Printf("ERROR: %s", err)
85+
} else {
86+
log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
87+
}
88+
},
89+
},
90+
)
91+
if err != nil {
92+
log.Fatalf("Unexpected error: %s", err)
93+
}
94+
95+
// Close the indexer channel and flush remaining items
96+
//
97+
if err := indexer.Close(context.Background()); err != nil {
98+
log.Fatalf("Unexpected error: %s", err)
99+
}
100+
101+
// Report the indexer statistics
102+
//
103+
stats := indexer.Stats()
104+
if stats.NumFailed > 0 {
105+
log.Fatalf("Indexed [%d] documents with [%d] errors", stats.NumFlushed, stats.NumFailed)
106+
} else {
107+
log.Printf("Successfully indexed [%d] documents", stats.NumFlushed)
108+
}
109+
110+
// For optimal performance, consider using a third-party package for JSON decoding and HTTP transport.
111+
//
112+
// For more information, examples and benchmarks, see:
113+
//
114+
// --> https://github.com/elastic/go-elasticsearch/tree/master/_examples/bulk
115+
}
+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Licensed to Elasticsearch B.V. under one or more agreements.
2+
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information.
4+
5+
// +build integration
6+
7+
package esutil_test
8+
9+
import (
10+
"context"
11+
"fmt"
12+
"os"
13+
"strconv"
14+
"strings"
15+
"sync/atomic"
16+
"testing"
17+
"time"
18+
19+
"github.com/elastic/go-elasticsearch/v7"
20+
"github.com/elastic/go-elasticsearch/v7/estransport"
21+
"github.com/elastic/go-elasticsearch/v7/esutil"
22+
)
23+
24+
func TestBulkIndexerIntegration(t *testing.T) {
25+
t.Run("Default", func(t *testing.T) {
26+
var countSuccessful uint64
27+
indexName := "test-bulk-integration"
28+
29+
es, _ := elasticsearch.NewClient(elasticsearch.Config{
30+
Logger: &estransport.ColorLogger{Output: os.Stdout},
31+
})
32+
33+
es.Indices.Delete([]string{indexName}, es.Indices.Delete.WithIgnoreUnavailable(true))
34+
es.Indices.Create(
35+
indexName,
36+
es.Indices.Create.WithBody(strings.NewReader(`{"settings": {"number_of_shards": 1, "number_of_replicas": 0, "refresh_interval":"5s"}}`)),
37+
es.Indices.Create.WithWaitForActiveShards("1"))
38+
39+
bi, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
40+
Index: indexName,
41+
Client: es,
42+
// FlushBytes: 3e+6,
43+
})
44+
45+
numItems := 100000
46+
body := `{"body":"Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat."}`
47+
start := time.Now().UTC()
48+
49+
for i := 1; i <= numItems; i++ {
50+
err := bi.Add(context.Background(), esutil.BulkIndexerItem{
51+
Action: "index",
52+
DocumentID: strconv.Itoa(i),
53+
Body: strings.NewReader(body),
54+
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
55+
atomic.AddUint64(&countSuccessful, 1)
56+
},
57+
})
58+
if err != nil {
59+
t.Fatalf("Unexpected error: %s", err)
60+
}
61+
}
62+
63+
if err := bi.Close(context.Background()); err != nil {
64+
t.Errorf("Unexpected error: %s", err)
65+
}
66+
67+
stats := bi.Stats()
68+
69+
if stats.NumAdded != uint64(numItems) {
70+
t.Errorf("Unexpected NumAdded: want=%d, got=%d", numItems, stats.NumAdded)
71+
}
72+
73+
if stats.NumIndexed != uint64(numItems) {
74+
t.Errorf("Unexpected NumIndexed: want=%d, got=%d", numItems, stats.NumIndexed)
75+
}
76+
77+
if stats.NumFailed != 0 {
78+
t.Errorf("Unexpected NumFailed: want=0, got=%d", stats.NumFailed)
79+
}
80+
81+
if countSuccessful != uint64(numItems) {
82+
t.Errorf("Unexpected countSuccessful: want=%d, got=%d", numItems, countSuccessful)
83+
}
84+
85+
fmt.Printf(" Added %d documents to indexer. Succeeded: %d. Failed: %d. Requests: %d. Duration: %s (%.0f docs/sec)\n",
86+
stats.NumAdded,
87+
stats.NumFlushed,
88+
stats.NumFailed,
89+
stats.NumRequests,
90+
time.Since(start).Truncate(time.Millisecond),
91+
1000.0/float64(time.Since(start)/time.Millisecond)*float64(stats.NumFlushed))
92+
})
93+
}

0 commit comments

Comments
 (0)