Skip to content

Commit 3f14d7d

Browse files
author
Nathaniel Cook
authored
Group By Fields (influxdata#731)
1 parent 0d92886 commit 3f14d7d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1353
-354
lines changed

CHANGELOG.md

+48
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,57 @@
44

55
### Release Notes
66

7+
#### Group By Fields
8+
9+
Kapacitor now supports grouping by fields.
10+
First convert a field into a tag using the EvalNode.
11+
Then group by the new tag.
12+
13+
Example:
14+
15+
```go
16+
stream
17+
|from()
18+
.measurement('alerts')
19+
// Convert field 'level' to tag.
20+
|eval(lambda: string("level"))
21+
.as('level')
22+
.tags('level')
23+
// Group by new tag 'level'.
24+
|groupBy('alert', 'level')
25+
|...
26+
```
27+
28+
Note the field `level` is now removed from the point since `.keep` was not used.
29+
See the [docs](https://docs.influxdata.com/kapacitor/v1.0/nodes/eval_node/#tags) for more details on how `.tags` works.
30+
31+
32+
#### Delete Fields or Tags
33+
34+
In companion with being able to create new tags, you can now delete tags or fields.
35+
36+
37+
Example:
38+
39+
```go
40+
stream
41+
|from()
42+
.measurement('alerts')
43+
|delete()
44+
// Remove the field `extra` and tag `uuid` from all points.
45+
.field('extra')
46+
.tag('uuid')
47+
|...
48+
```
49+
50+
51+
752
### Features
853

954
- [#702](https://github.com/influxdata/kapacitor/pull/702): Add plumbing for authentication backends.
55+
- [#624](https://github.com/influxdata/kapacitor/issue/624): BREAKING: Add ability to GroupBy fields. First use EvalNode to create a tag from a field and then group by the new tag.
56+
Also allows for grouping by measurement.
57+
The breaking change is that the group ID format has changed to allow for the measurement name.
1058

1159
### Bugfixes
1260

alert.go

+1
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,7 @@ func (a *AlertNode) runAlert([]byte) error {
385385
batch := models.Batch{
386386
Name: p.Name,
387387
Group: p.Group,
388+
ByName: p.Dimensions.ByName,
388389
Tags: p.Tags,
389390
Points: []models.BatchPoint{models.BatchPointFromPoint(p)},
390391
}

batch.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -95,17 +95,19 @@ func (s *BatchNode) Abort() {
9595
}
9696

9797
type BatchQueries struct {
98-
Queries []string
99-
Cluster string
98+
Queries []string
99+
Cluster string
100+
GroupByMeasurement bool
100101
}
101102

102103
func (s *BatchNode) Queries(start, stop time.Time) []BatchQueries {
103104
queries := make([]BatchQueries, len(s.children))
104105
for i, b := range s.children {
105106
qn := b.(*QueryNode)
106107
queries[i] = BatchQueries{
107-
Queries: qn.Queries(start, stop),
108-
Cluster: qn.Cluster(),
108+
Queries: qn.Queries(start, stop),
109+
Cluster: qn.Cluster(),
110+
GroupByMeasurement: qn.GroupByMeasurement(),
109111
}
110112
}
111113
return queries
@@ -136,6 +138,7 @@ type QueryNode struct {
136138
connectErrors *expvar.Int
137139
batchesQueried *expvar.Int
138140
pointsQueried *expvar.Int
141+
byName bool
139142
}
140143

141144
func newQueryNode(et *ExecutingTask, n *pipeline.QueryNode, l *log.Logger) (*QueryNode, error) {
@@ -144,6 +147,7 @@ func newQueryNode(et *ExecutingTask, n *pipeline.QueryNode, l *log.Logger) (*Que
144147
b: n,
145148
closing: make(chan struct{}),
146149
aborting: make(chan struct{}),
150+
byName: n.GroupByMeasurementFlag,
147151
}
148152
bn.node.runF = bn.runBatch
149153
bn.node.stopF = bn.stopBatch
@@ -196,6 +200,10 @@ func newQueryNode(et *ExecutingTask, n *pipeline.QueryNode, l *log.Logger) (*Que
196200
return bn, nil
197201
}
198202

203+
func (b *QueryNode) GroupByMeasurement() bool {
204+
return b.byName
205+
}
206+
199207
// Return list of databases and retention policies
200208
// the batcher will query.
201209
func (b *QueryNode) DBRPs() ([]DBRP, error) {
@@ -319,7 +327,7 @@ func (b *QueryNode) doQuery() error {
319327

320328
// Collect batches
321329
for _, res := range resp.Results {
322-
batches, err := models.ResultToBatches(res)
330+
batches, err := models.ResultToBatches(res, b.byName)
323331
if err != nil {
324332
b.logger.Println("E! failed to understand query result:", err)
325333
b.queryErrors.Add(1)

delete.go

+97
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package kapacitor
2+
3+
import (
4+
"log"
5+
6+
"github.com/influxdata/kapacitor/expvar"
7+
"github.com/influxdata/kapacitor/models"
8+
"github.com/influxdata/kapacitor/pipeline"
9+
)
10+
11+
const (
12+
statsFieldsDeleted = "fields_deleted"
13+
statsTagsDeleted = "tags_deleted"
14+
)
15+
16+
type DeleteNode struct {
17+
node
18+
d *pipeline.DeleteNode
19+
20+
fieldsDeleted *expvar.Int
21+
tagsDeleted *expvar.Int
22+
}
23+
24+
// Create a new DeleteNode which applies a transformation func to each point in a stream and returns a single point.
25+
func newDeleteNode(et *ExecutingTask, n *pipeline.DeleteNode, l *log.Logger) (*DeleteNode, error) {
26+
dn := &DeleteNode{
27+
node: node{Node: n, et: et, logger: l},
28+
d: n,
29+
}
30+
dn.node.runF = dn.runDelete
31+
return dn, nil
32+
}
33+
34+
func (e *DeleteNode) runDelete(snapshot []byte) error {
35+
e.fieldsDeleted = &expvar.Int{}
36+
e.tagsDeleted = &expvar.Int{}
37+
38+
e.statMap.Set(statsFieldsDeleted, e.fieldsDeleted)
39+
e.statMap.Set(statsTagsDeleted, e.tagsDeleted)
40+
switch e.Provides() {
41+
case pipeline.StreamEdge:
42+
for p, ok := e.ins[0].NextPoint(); ok; p, ok = e.ins[0].NextPoint() {
43+
e.timer.Start()
44+
p.Fields, p.Tags = e.doDeletes(p.Fields, p.Tags)
45+
e.timer.Stop()
46+
for _, child := range e.outs {
47+
err := child.CollectPoint(p)
48+
if err != nil {
49+
return err
50+
}
51+
}
52+
}
53+
case pipeline.BatchEdge:
54+
for b, ok := e.ins[0].NextBatch(); ok; b, ok = e.ins[0].NextBatch() {
55+
e.timer.Start()
56+
for i := range b.Points {
57+
b.Points[i].Fields, b.Points[i].Tags = e.doDeletes(b.Points[i].Fields, b.Points[i].Tags)
58+
}
59+
e.timer.Stop()
60+
for _, child := range e.outs {
61+
err := child.CollectBatch(b)
62+
if err != nil {
63+
return err
64+
}
65+
}
66+
}
67+
}
68+
return nil
69+
}
70+
71+
func (d *DeleteNode) doDeletes(fields models.Fields, tags models.Tags) (models.Fields, models.Tags) {
72+
newFields := fields
73+
fieldsCopied := false
74+
for _, field := range d.d.Fields {
75+
if _, ok := fields[field]; ok {
76+
if !fieldsCopied {
77+
newFields = newFields.Copy()
78+
fieldsCopied = true
79+
}
80+
d.fieldsDeleted.Add(1)
81+
delete(newFields, field)
82+
}
83+
}
84+
newTags := tags
85+
tagsCopied := false
86+
for _, tag := range d.d.Tags {
87+
if _, ok := tags[tag]; ok {
88+
if !tagsCopied {
89+
newTags = newTags.Copy()
90+
tagsCopied = true
91+
}
92+
d.tagsDeleted.Add(1)
93+
delete(newTags, tag)
94+
}
95+
}
96+
return newFields, newTags
97+
}

edge.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,11 @@ type edgeStat struct {
8888
collected int64
8989
emitted int64
9090
tags models.Tags
91-
dims []string
91+
dims models.Dimensions
9292
}
9393

9494
// Get a snapshot of the current group statistics for this edge
95-
func (e *Edge) readGroupStats(f func(group models.GroupID, collected, emitted int64, tags models.Tags, dims []string)) {
95+
func (e *Edge) readGroupStats(f func(group models.GroupID, collected, emitted int64, tags models.Tags, dims models.Dimensions)) {
9696
e.groupMu.RLock()
9797
defer e.groupMu.RUnlock()
9898
for group, stats := range e.groupStats {
@@ -188,7 +188,7 @@ func (e *Edge) CollectBatch(b models.Batch) error {
188188
}
189189

190190
// Increment the emitted count of the group for this edge.
191-
func (e *Edge) incEmitted(group models.GroupID, tags models.Tags, dims []string, count int64) {
191+
func (e *Edge) incEmitted(group models.GroupID, tags models.Tags, dims models.Dimensions, count int64) {
192192
// we are "manually" calling Unlock() and not using defer, because this method is called
193193
// in hot locations (NextPoint/CollectPoint) and defer have some performance penalty
194194
e.groupMu.Lock()
@@ -208,7 +208,7 @@ func (e *Edge) incEmitted(group models.GroupID, tags models.Tags, dims []string,
208208
}
209209

210210
// Increment the collected count of the group for this edge.
211-
func (e *Edge) incCollected(group models.GroupID, tags models.Tags, dims []string, count int64) {
211+
func (e *Edge) incCollected(group models.GroupID, tags models.Tags, dims models.Dimensions, count int64) {
212212
// we are "manually" calling Unlock() and not using defer, because this method is called
213213
// in hot locations (NextPoint/CollectPoint) and defer have some performance penalty
214214
e.groupMu.Lock()

edge_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,19 @@ import (
1111
)
1212

1313
func Benchmark_CollectPoint(b *testing.B) {
14+
name := "point"
1415
b.ReportAllocs()
1516
ls := &logService{}
1617
e := newEdge("BCollectPoint", "parent", "child", pipeline.StreamEdge, defaultEdgeBufferSize, ls)
1718
p := models.Point{
18-
Name: "point",
19+
Name: name,
1920
Tags: models.Tags{
2021
"tag1": "value1",
2122
"tag2": "value2",
2223
"tag3": "value3",
2324
"tag4": "value4",
2425
},
25-
Group: models.NilGroup,
26+
Group: models.ToGroupID(name, nil, models.Dimensions{}),
2627
Fields: models.Fields{
2728
"field1": 42,
2829
"field2": 4.2,

eval.go

+37-10
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type EvalNode struct {
2424
expressionsByGroup map[models.GroupID][]stateful.Expression
2525
refVarList [][]string
2626
scopePool stateful.ScopePool
27+
tags map[string]bool
2728

2829
evalErrors *expvar.Int
2930
}
@@ -55,6 +56,14 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN
5556
// Create a single pool for the combination of all expressions
5657
en.scopePool = stateful.NewScopePool(stateful.FindReferenceVariables(expressions...))
5758

59+
// Create map of tags
60+
if l := len(n.TagsList); l > 0 {
61+
en.tags = make(map[string]bool, l)
62+
for _, tag := range n.TagsList {
63+
en.tags[tag] = true
64+
}
65+
}
66+
5867
en.node.runF = en.runEval
5968
return en, nil
6069
}
@@ -67,7 +76,7 @@ func (e *EvalNode) runEval(snapshot []byte) error {
6776
var err error
6877
for p, ok := e.ins[0].NextPoint(); ok; p, ok = e.ins[0].NextPoint() {
6978
e.timer.Start()
70-
p.Fields, err = e.eval(p.Time, p.Group, p.Fields, p.Tags)
79+
p.Fields, p.Tags, err = e.eval(p.Time, p.Group, p.Fields, p.Tags)
7180
if err != nil {
7281
e.evalErrors.Add(1)
7382
if !e.e.QuiteFlag {
@@ -91,7 +100,7 @@ func (e *EvalNode) runEval(snapshot []byte) error {
91100
e.timer.Start()
92101
for i := 0; i < len(b.Points); {
93102
p := b.Points[i]
94-
b.Points[i].Fields, err = e.eval(p.Time, b.Group, p.Fields, p.Tags)
103+
b.Points[i].Fields, b.Points[i].Tags, err = e.eval(p.Time, b.Group, p.Fields, p.Tags)
95104
if err != nil {
96105
e.evalErrors.Add(1)
97106
if !e.e.QuiteFlag {
@@ -115,7 +124,7 @@ func (e *EvalNode) runEval(snapshot []byte) error {
115124
return nil
116125
}
117126

118-
func (e *EvalNode) eval(now time.Time, group models.GroupID, fields models.Fields, tags map[string]string) (models.Fields, error) {
127+
func (e *EvalNode) eval(now time.Time, group models.GroupID, fields models.Fields, tags models.Tags) (models.Fields, models.Tags, error) {
119128
vars := e.scopePool.Get()
120129
defer e.scopePool.Put(vars)
121130
expressions, ok := e.expressionsByGroup[group]
@@ -129,23 +138,38 @@ func (e *EvalNode) eval(now time.Time, group models.GroupID, fields models.Field
129138
for i, expr := range expressions {
130139
err := fillScope(vars, e.refVarList[i], now, fields, tags)
131140
if err != nil {
132-
return nil, err
141+
return nil, nil, err
133142
}
134143
v, err := expr.Eval(vars)
135144
if err != nil {
136-
return nil, err
145+
return nil, nil, err
137146
}
138147
name := e.e.AsList[i]
139148
vars.Set(name, v)
140149
}
150+
newTags := tags
151+
if len(e.tags) > 0 {
152+
newTags = newTags.Copy()
153+
for tag := range e.tags {
154+
v, err := vars.Get(tag)
155+
if err != nil {
156+
return nil, nil, err
157+
}
158+
if s, ok := v.(string); !ok {
159+
return nil, nil, fmt.Errorf("result of a tag expression must be of type string, got %T", v)
160+
} else {
161+
newTags[tag] = s
162+
}
163+
}
164+
}
141165
var newFields models.Fields
142166
if e.e.KeepFlag {
143167
if l := len(e.e.KeepList); l != 0 {
144168
newFields = make(models.Fields, l)
145169
for _, f := range e.e.KeepList {
146170
v, err := vars.Get(f)
147171
if err != nil {
148-
return nil, err
172+
return nil, nil, err
149173
}
150174
newFields[f] = v
151175
}
@@ -157,20 +181,23 @@ func (e *EvalNode) eval(now time.Time, group models.GroupID, fields models.Field
157181
for _, f := range e.e.AsList {
158182
v, err := vars.Get(f)
159183
if err != nil {
160-
return nil, err
184+
return nil, nil, err
161185
}
162186
newFields[f] = v
163187
}
164188
}
165189
} else {
166-
newFields = make(models.Fields, len(e.e.AsList))
190+
newFields = make(models.Fields, len(e.e.AsList)-len(e.tags))
167191
for _, f := range e.e.AsList {
192+
if e.tags[f] {
193+
continue
194+
}
168195
v, err := vars.Get(f)
169196
if err != nil {
170-
return nil, err
197+
return nil, nil, err
171198
}
172199
newFields[f] = v
173200
}
174201
}
175-
return newFields, nil
202+
return newFields, newTags, nil
176203
}

0 commit comments

Comments
 (0)