// Licensed to Elasticsearch B.V. under one or more agreements. // Elasticsearch B.V. licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. // +build ignore // This example demonstrates indexing documents using the esutil.BulkIndexer helper. // // You can configure the settings with command line flags: // // go run indexer.go --workers=8 --count=100000 --flush=1000000 // package main import ( "bytes" "context" "encoding/json" "flag" "log" "math/rand" "runtime" "strconv" "strings" "sync/atomic" "time" "github.com/cenkalti/backoff/v4" "github.com/dustin/go-humanize" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/elastic/go-elasticsearch/v8/esutil" ) type Article struct { ID int `json:"id"` Title string `json:"title"` Body string `json:"body"` Published time.Time `json:"published"` Author Author `json:"author"` } type Author struct { FirstName string `json:"first_name"` LastName string `json:"last_name"` } var ( indexName string numWorkers int flushBytes int numItems int ) func init() { flag.StringVar(&indexName, "index", "test-bulk-example", "Index name") flag.IntVar(&numWorkers, "workers", runtime.NumCPU(), "Number of indexer workers") flag.IntVar(&flushBytes, "flush", 5e+6, "Flush threshold in bytes") flag.IntVar(&numItems, "count", 10000, "Number of documents to generate") flag.Parse() rand.Seed(time.Now().UnixNano()) } func main() { log.SetFlags(0) var ( articles []*Article countSuccessful uint64 res *esapi.Response err error ) log.Printf( "\x1b[1mBulkIndexer\x1b[0m: documents [%s] workers [%d] flush [%s]", humanize.Comma(int64(numItems)), numWorkers, humanize.Bytes(uint64(flushBytes))) log.Println(strings.Repeat("▁", 65)) // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> // // Use a third-party package for implementing the backoff function // retryBackoff := backoff.NewExponentialBackOff() // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> // // Create the Elasticsearch client // // NOTE: For optimal performance, consider using a third-party HTTP transport package. // See an example in the "benchmarks" folder. // es, err := elasticsearch.NewClient(elasticsearch.Config{ // Retry on 429 TooManyRequests statuses // RetryOnStatus: []int{502, 503, 504, 429}, // Configure the backoff function // RetryBackoff: func(i int) time.Duration { if i == 1 { retryBackoff.Reset() } return retryBackoff.NextBackOff() }, // Retry up to 5 attempts // MaxRetries: 5, }) if err != nil { log.Fatalf("Error creating the client: %s", err) } // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> // // Create the BulkIndexer // // NOTE: For optimal performance, consider using a third-party JSON decoding package. // See an example in the "benchmarks" folder. // bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Index: indexName, // The default index name Client: es, // The Elasticsearch client NumWorkers: numWorkers, // The number of worker goroutines FlushBytes: int(flushBytes), // The flush threshold in bytes FlushInterval: 30 * time.Second, // The periodic flush interval }) if err != nil { log.Fatalf("Error creating the indexer: %s", err) } // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< // Generate the articles collection // names := []string{"Alice", "John", "Mary"} for i := 1; i <= numItems; i++ { articles = append(articles, &Article{ ID: i, Title: strings.Join([]string{"Title", strconv.Itoa(i)}, " "), Body: "Lorem ipsum dolor sit amet...", Published: time.Now().Round(time.Second).UTC().AddDate(0, 0, i), Author: Author{ FirstName: names[rand.Intn(len(names))], LastName: "Smith", }, }) } log.Printf("→ Generated %s articles", humanize.Comma(int64(len(articles)))) // Re-create the index // if res, err = es.Indices.Delete([]string{indexName}, es.Indices.Delete.WithIgnoreUnavailable(true)); err != nil || res.IsError() { log.Fatalf("Cannot delete index: %s", err) } res.Body.Close() res, err = es.Indices.Create(indexName) if err != nil { log.Fatalf("Cannot create index: %s", err) } if res.IsError() { log.Fatalf("Cannot create index: %s", res) } res.Body.Close() start := time.Now().UTC() // Loop over the collection // for _, a := range articles { // Prepare the data payload: encode article to JSON // data, err := json.Marshal(a) if err != nil { log.Fatalf("Cannot encode article %d: %s", a.ID, err) } // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> // // Add an item to the BulkIndexer // err = bi.Add( context.Background(), esutil.BulkIndexerItem{ // Action field configures the operation to perform (index, create, delete, update) Action: "index", // DocumentID is the (optional) document ID DocumentID: strconv.Itoa(a.ID), // Body is an `io.Reader` with the payload Body: bytes.NewReader(data), // OnSuccess is called for each successful operation OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { atomic.AddUint64(&countSuccessful, 1) }, // OnFailure is called for each failed operation OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { if err != nil { log.Printf("ERROR: %s", err) } else { log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason) } }, }, ) if err != nil { log.Fatalf("Unexpected error: %s", err) } // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< } // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> // Close the indexer // if err := bi.Close(context.Background()); err != nil { log.Fatalf("Unexpected error: %s", err) } // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< biStats := bi.Stats() // Report the results: number of indexed docs, number of errors, duration, indexing rate // log.Println(strings.Repeat("▔", 65)) dur := time.Since(start) if biStats.NumFailed > 0 { log.Fatalf( "Indexed [%s] documents with [%s] errors in %s (%s docs/sec)", humanize.Comma(int64(biStats.NumFlushed)), humanize.Comma(int64(biStats.NumFailed)), dur.Truncate(time.Millisecond), humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))), ) } else { log.Printf( "Sucessfuly indexed [%s] documents in %s (%s docs/sec)", humanize.Comma(int64(biStats.NumFlushed)), dur.Truncate(time.Millisecond), humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))), ) } }