Skip to content

Commit 493dd6b

Browse files
Expose BulkIndexer total flushed bytes metric (#914) (#931)
Co-authored-by: Aurèle Oulès <aurele@oules.com>
1 parent 563e1d7 commit 493dd6b

File tree

1 file changed

+32
-25
lines changed

1 file changed

+32
-25
lines changed

esutil/bulk_indexer.go

+32-25
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,15 @@ type BulkIndexerConfig struct {
8686

8787
// BulkIndexerStats represents the indexer statistics.
8888
type BulkIndexerStats struct {
89-
NumAdded uint64
90-
NumFlushed uint64
91-
NumFailed uint64
92-
NumIndexed uint64
93-
NumCreated uint64
94-
NumUpdated uint64
95-
NumDeleted uint64
96-
NumRequests uint64
89+
NumAdded uint64
90+
NumFlushed uint64
91+
NumFailed uint64
92+
NumIndexed uint64
93+
NumCreated uint64
94+
NumUpdated uint64
95+
NumDeleted uint64
96+
NumRequests uint64
97+
FlushedBytes uint64
9798
}
9899

99100
// BulkIndexerItem represents an indexer item.
@@ -266,14 +267,15 @@ type bulkIndexer struct {
266267
}
267268

268269
type bulkIndexerStats struct {
269-
numAdded uint64
270-
numFlushed uint64
271-
numFailed uint64
272-
numIndexed uint64
273-
numCreated uint64
274-
numUpdated uint64
275-
numDeleted uint64
276-
numRequests uint64
270+
numAdded uint64
271+
numFlushed uint64
272+
numFailed uint64
273+
numIndexed uint64
274+
numCreated uint64
275+
numUpdated uint64
276+
numDeleted uint64
277+
numRequests uint64
278+
flushedBytes uint64
277279
}
278280

279281
// NewBulkIndexer creates a new bulk indexer.
@@ -354,14 +356,15 @@ func (bi *bulkIndexer) Close(ctx context.Context) error {
354356
// Stats returns indexer statistics.
355357
func (bi *bulkIndexer) Stats() BulkIndexerStats {
356358
return BulkIndexerStats{
357-
NumAdded: atomic.LoadUint64(&bi.stats.numAdded),
358-
NumFlushed: atomic.LoadUint64(&bi.stats.numFlushed),
359-
NumFailed: atomic.LoadUint64(&bi.stats.numFailed),
360-
NumIndexed: atomic.LoadUint64(&bi.stats.numIndexed),
361-
NumCreated: atomic.LoadUint64(&bi.stats.numCreated),
362-
NumUpdated: atomic.LoadUint64(&bi.stats.numUpdated),
363-
NumDeleted: atomic.LoadUint64(&bi.stats.numDeleted),
364-
NumRequests: atomic.LoadUint64(&bi.stats.numRequests),
359+
NumAdded: atomic.LoadUint64(&bi.stats.numAdded),
360+
NumFlushed: atomic.LoadUint64(&bi.stats.numFlushed),
361+
NumFailed: atomic.LoadUint64(&bi.stats.numFailed),
362+
NumIndexed: atomic.LoadUint64(&bi.stats.numIndexed),
363+
NumCreated: atomic.LoadUint64(&bi.stats.numCreated),
364+
NumUpdated: atomic.LoadUint64(&bi.stats.numUpdated),
365+
NumDeleted: atomic.LoadUint64(&bi.stats.numDeleted),
366+
NumRequests: atomic.LoadUint64(&bi.stats.numRequests),
367+
FlushedBytes: atomic.LoadUint64(&bi.stats.flushedBytes),
365368
}
366369
}
367370

@@ -508,7 +511,9 @@ func (w *worker) flushBuffer(ctx context.Context) error {
508511
defer func() { w.bi.config.OnFlushEnd(ctx) }()
509512
}
510513

511-
if w.buf.Len() < 1 {
514+
bufLen := w.buf.Len()
515+
516+
if bufLen < 1 {
512517
if w.bi.config.DebugLogger != nil {
513518
w.bi.config.DebugLogger.Printf("[worker-%03d] Flush: Buffer empty\n", w.id)
514519
}
@@ -631,6 +636,8 @@ func (w *worker) flushBuffer(ctx context.Context) error {
631636
}
632637
}
633638

639+
atomic.AddUint64(&w.bi.stats.flushedBytes, uint64(bufLen))
640+
634641
return err
635642
}
636643

0 commit comments

Comments
 (0)