-
Notifications
You must be signed in to change notification settings - Fork 626
/
Copy pathconsumer.go
119 lines (98 loc) · 2.9 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// Licensed to Elasticsearch B.V. under one or more agreements.
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
package consumer
import (
"bytes"
"context"
"fmt"
"time"
"github.com/segmentio/kafka-go"
"go.elastic.co/apm"
"github.com/elastic/go-elasticsearch/v8/esutil"
)
type Consumer struct {
BrokerURL string
TopicName string
Indexer esutil.BulkIndexer
reader *kafka.Reader
startTime time.Time
totalMessages int64
totalErrors int64
totalBytes int64
}
func (c *Consumer) Run(ctx context.Context) (err error) {
if c.Indexer == nil {
panic(fmt.Sprintf("%T.Indexer is nil", c))
}
c.startTime = time.Now()
c.reader = kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{c.BrokerURL},
GroupID: "go-elasticsearch-demo",
Topic: c.TopicName,
// MinBytes: 1e+6, // 1MB
// MaxBytes: 5e+6, // 5MB
ReadLagInterval: 1 * time.Second,
})
for {
msg, err := c.reader.ReadMessage(ctx)
if err != nil {
return fmt.Errorf("reader: %s", err)
}
// log.Printf("%v/%v/%v:%s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
if err := c.Indexer.Add(ctx,
esutil.BulkIndexerItem{
Action: "create",
Body: bytes.NewReader(msg.Value),
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
// log.Printf("Indexed %s/%s", res.Index, res.DocumentID)
},
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
apm.CaptureError(ctx, err).Send()
} else {
if res.Error.Type != "" {
// log.Printf("%s:%s", res.Error.Type, res.Error.Reason)
// apm.CaptureError(ctx, fmt.Errorf("%s:%s", res.Error.Type, res.Error.Reason)).Send()
} else {
// log.Printf("%s/%s %s (%d)", res.Index, res.DocumentID, res.Result, res.Status)
// apm.CaptureError(ctx, fmt.Errorf("%s/%s %s (%d)", res.Index, res.DocumentID, res.Result, res.Status)).Send()
}
}
},
}); err != nil {
apm.DefaultTracer.NewError(err).Send()
return fmt.Errorf("indexer: %s", err)
}
}
c.reader.Close()
c.Indexer.Close(ctx)
return nil
}
type Stats struct {
Duration time.Duration
TotalLag int64
TotalMessages int64
TotalErrors int64
TotalBytes int64
Throughput float64
}
func (c *Consumer) Stats() Stats {
if c.reader == nil || c.Indexer == nil {
return Stats{}
}
duration := time.Since(c.startTime)
readerStats := c.reader.Stats()
c.totalMessages += readerStats.Messages
c.totalErrors += readerStats.Errors
c.totalBytes += readerStats.Bytes
rate := float64(c.totalMessages) / duration.Seconds()
return Stats{
Duration: duration,
TotalLag: readerStats.Lag,
TotalMessages: c.totalMessages,
TotalErrors: c.totalErrors,
TotalBytes: c.totalBytes,
Throughput: rate,
}
}