diff --git a/.gitignore b/.gitignore index 5c947e706..2bce2c70b 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ kapacitor*.zip *.pyc *.test /test-logs +*.prof diff --git a/CHANGELOG.md b/CHANGELOG.md index dfcdb318f..473a5282d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ ### Release Notes ### Features +- [#231](https://github.com/influxdata/kapacitor/pull/231): Add ShiftNode so values can be shifted in time for joining/comparisons. + ### Bugfixes - [#199](https://github.com/influxdata/kapacitor/issues/199): BREAKING: Various fixes for the Alerta integration. diff --git a/integrations/data/TestStream_Shift.srpl b/integrations/data/TestStream_Shift.srpl new file mode 100644 index 000000000..f0792eabd --- /dev/null +++ b/integrations/data/TestStream_Shift.srpl @@ -0,0 +1,78 @@ +dbname +rpname +cpu,type=idle,host=serverA value=97.1 0000000001 +dbname +rpname +cpu,type=idle,host=serverB value=97.1 0000000001 +dbname +rpname +disk,type=sda,host=serverB value=39 0000000001 +dbname +rpname +cpu,type=idle,host=serverB value=92.6 0000000002 +dbname +rpname +cpu,type=idle,host=serverA value=95.6 0000000003 +dbname +rpname +cpu,type=idle,host=serverB value=95.6 0000000003 +dbname +rpname +cpu,type=idle,host=serverA value=93.1 0000000004 +dbname +rpname +cpu,type=idle,host=serverB value=93.1 0000000004 +dbname +rpname +cpu,type=idle,host=serverA value=92.6 0000000005 +dbname +rpname +cpu,type=idle,host=serverB value=92.6 0000000005 +dbname +rpname +cpu,type=idle,host=serverA value=95.8 0000000006 +dbname +rpname +cpu,type=idle,host=serverB value=95.8 0000000006 +dbname +rpname +cpu,type=idle,host=serverC value=95.8 0000000006 +dbname +rpname +cpu,type=idle,host=serverA value=92.7 0000000007 +dbname +rpname +cpu,type=idle,host=serverB value=92.7 0000000007 +dbname +rpname +cpu,type=idle,host=serverA value=96.0 0000000008 +dbname +rpname +cpu,type=idle,host=serverB value=96.0 0000000008 +dbname +rpname +cpu,type=idle,host=serverA value=93.4 0000000009 +dbname +rpname +cpu,type=idle,host=serverB value=93.4 0000000009 +dbname +rpname +disk,type=sda,host=serverB value=423 0000000009 +dbname +rpname +cpu,type=idle,host=serverA value=95.3 0000000010 +dbname +rpname +cpu,type=idle,host=serverB value=95.3 0000000010 +dbname +rpname +cpu,type=idle,host=serverA value=96.4 0000000011 +dbname +rpname +cpu,type=idle,host=serverB value=96.4 0000000011 +dbname +rpname +cpu,type=idle,host=serverA value=95.1 0000000012 +dbname +rpname +cpu,type=idle,host=serverB value=95.1 0000000012 diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 8b895a8b8..f9cb121ee 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -228,6 +228,210 @@ stream testStreamerWithOutput(t, "TestStream_Window", script, 13*time.Second, er, nil, false) } +func TestStream_Shift(t *testing.T) { + + var script = ` +var period = 5s + +var data = stream + .from() + .measurement('cpu') + .where(lambda: "host" == 'serverA') + +var past = data + .window() + .period(period) + .every(period) + .align() + .mapReduce(influxql.count('value')) + .shift(period) + +var current = data + .window() + .period(period) + .every(period) + .align() + .mapReduce(influxql.count('value')) + +past.join(current) + .as('past', 'current') + .eval(lambda: "current.count" - "past.count") + .keep() + .as('diff') + .httpOut('TestStream_Shift') +` + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: nil, + Columns: []string{"time", "current.count", "diff", "past.count"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 5.0, + 1.0, + 4.0, + }}, + }, + }, + } + + testStreamerWithOutput(t, "TestStream_Shift", script, 15*time.Second, er, nil, false) +} + +func TestStream_ShiftBatch(t *testing.T) { + + var script = ` +var period = 5s + +var data = stream + .from() + .measurement('cpu') + .where(lambda: "host" == 'serverA') + +var past = data + .window() + .period(period) + .every(period) + .align() + .shift(period) + .mapReduce(influxql.count('value')) + +var current = data + .window() + .period(period) + .every(period) + .align() + .mapReduce(influxql.count('value')) + +past.join(current) + .as('past', 'current') + .eval(lambda: "current.count" - "past.count") + .keep() + .as('diff') + .httpOut('TestStream_Shift') +` + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: nil, + Columns: []string{"time", "current.count", "diff", "past.count"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 5.0, + 1.0, + 4.0, + }}, + }, + }, + } + + testStreamerWithOutput(t, "TestStream_Shift", script, 15*time.Second, er, nil, false) +} + +func TestStream_ShiftNegative(t *testing.T) { + + var script = ` +var period = 5s + +var data = stream + .from() + .measurement('cpu') + .where(lambda: "host" == 'serverA') + +var past = data + .window() + .period(period) + .every(period) + .align() + .mapReduce(influxql.count('value')) + +var current = data + .window() + .period(period) + .every(period) + .align() + .mapReduce(influxql.count('value')) + .shift(-period) + +past.join(current) + .as('past', 'current') + .eval(lambda: "current.count" - "past.count") + .keep() + .as('diff') + .httpOut('TestStream_Shift') +` + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: nil, + Columns: []string{"time", "current.count", "diff", "past.count"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC), + 5.0, + 1.0, + 4.0, + }}, + }, + }, + } + + testStreamerWithOutput(t, "TestStream_Shift", script, 15*time.Second, er, nil, false) +} + +func TestStream_ShiftBatchNegative(t *testing.T) { + + var script = ` +var period = 5s + +var data = stream + .from() + .measurement('cpu') + .where(lambda: "host" == 'serverA') + +var past = data + .window() + .period(period) + .every(period) + .align() + .mapReduce(influxql.count('value')) + +var current = data + .window() + .period(period) + .every(period) + .align() + .shift(-period) + .mapReduce(influxql.count('value')) + +past.join(current) + .as('past', 'current') + .eval(lambda: "current.count" - "past.count") + .keep() + .as('diff') + .httpOut('TestStream_Shift') +` + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: nil, + Columns: []string{"time", "current.count", "diff", "past.count"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC), + 5.0, + 1.0, + 4.0, + }}, + }, + }, + } + + testStreamerWithOutput(t, "TestStream_Shift", script, 15*time.Second, er, nil, false) +} + func TestStream_SimpleMR(t *testing.T) { var script = ` diff --git a/pipeline/node.go b/pipeline/node.go index 77de18d07..16aaf19ba 100644 --- a/pipeline/node.go +++ b/pipeline/node.go @@ -428,3 +428,10 @@ func (n *chainnode) Derivative(field string) *DerivativeNode { n.linkChild(s) return s } + +// Create a new node that shifts the incoming points or batches in time. +func (n *chainnode) Shift(shift time.Duration) *ShiftNode { + s := newShiftNode(n.Provides(), shift) + n.linkChild(s) + return s +} diff --git a/pipeline/shift.go b/pipeline/shift.go new file mode 100644 index 000000000..670cfc39e --- /dev/null +++ b/pipeline/shift.go @@ -0,0 +1,34 @@ +package pipeline + +import ( + "time" +) + +// Shift points and batches in time, this is useful for comparing +// batches or points from different times. +// +// Example: +// stream +// .shift(5m) +// +// Shift all data points 5m forward in time. +// +// Example: +// stream +// .shift(-10s) +// +// Shift all data points 10s backward in time. +type ShiftNode struct { + chainnode + + // Keep one point or batch every Duration + // tick:ignore + Shift time.Duration +} + +func newShiftNode(wants EdgeType, shift time.Duration) *ShiftNode { + return &ShiftNode{ + chainnode: newBasicChainNode("shift", wants, wants), + Shift: shift, + } +} diff --git a/shift.go b/shift.go new file mode 100644 index 000000000..3ca424015 --- /dev/null +++ b/shift.go @@ -0,0 +1,59 @@ +package kapacitor + +import ( + "errors" + "log" + "time" + + "github.com/influxdata/kapacitor/pipeline" +) + +type ShiftNode struct { + node + s *pipeline.ShiftNode + + shift time.Duration +} + +// Create a new ShiftNode which shifts points and batches in time. +func newShiftNode(et *ExecutingTask, n *pipeline.ShiftNode, l *log.Logger) (*ShiftNode, error) { + sn := &ShiftNode{ + node: node{Node: n, et: et, logger: l}, + s: n, + shift: n.Shift, + } + sn.node.runF = sn.runShift + if n.Shift == 0 { + return nil, errors.New("invalid shift value: must be non zero duration") + } + return sn, nil +} + +func (s *ShiftNode) runShift([]byte) error { + switch s.Wants() { + case pipeline.StreamEdge: + for p, ok := s.ins[0].NextPoint(); ok; p, ok = s.ins[0].NextPoint() { + p.Time = p.Time.Add(s.shift) + for _, child := range s.outs { + err := child.CollectPoint(p) + if err != nil { + return err + } + } + } + case pipeline.BatchEdge: + for b, ok := s.ins[0].NextBatch(); ok; b, ok = s.ins[0].NextBatch() { + b.TMax = b.TMax.Add(s.shift) + for i, p := range b.Points { + b.Points[i].Time = p.Time.Add(s.shift) + } + for _, child := range s.outs { + err := child.CollectBatch(b) + if err != nil { + return err + } + } + } + } + return nil +} diff --git a/task.go b/task.go index ac66f0b7c..38bfcd785 100644 --- a/task.go +++ b/task.go @@ -340,6 +340,8 @@ func (et *ExecutingTask) createNode(p pipeline.Node, l *log.Logger) (Node, error return newUDFNode(et, t, l) case *pipeline.StatsNode: return newStatsNode(et, t, l) + case *pipeline.ShiftNode: + return newShiftNode(et, t, l) default: return nil, fmt.Errorf("unknown pipeline node type %T", p) } diff --git a/tick/eval.go b/tick/eval.go index e31dc4f95..ce49f6419 100644 --- a/tick/eval.go +++ b/tick/eval.go @@ -75,7 +75,10 @@ func eval(n Node, scope *Scope, stck *stack) (err error) { if err != nil { return } - evalUnary(node.Operator, scope, stck) + err := evalUnary(node.Operator, scope, stck) + if err != nil { + return err + } case *LambdaNode: // Catch panic from resolveIdents and return as error. err = func() (e error) { @@ -155,15 +158,31 @@ func evalUnary(op tokenType, scope *Scope, stck *stack) error { v := stck.Pop() switch op { case TokenMinus: + if ident, ok := v.(*IdentifierNode); ok { + value, err := scope.Get(ident.Ident) + if err != nil { + return err + } + v = value + } switch n := v.(type) { case float64: stck.Push(-1 * n) case int64: stck.Push(-1 * n) + case time.Duration: + stck.Push(-1 * n) default: return fmt.Errorf("invalid arugument to '-' %v", v) } case TokenNot: + if ident, ok := v.(*IdentifierNode); ok { + value, err := scope.Get(ident.Ident) + if err != nil { + return err + } + v = value + } if b, ok := v.(bool); ok { stck.Push(!b) } else { diff --git a/tick/eval_test.go b/tick/eval_test.go index 5a798f055..06ed9d772 100644 --- a/tick/eval_test.go +++ b/tick/eval_test.go @@ -173,3 +173,68 @@ func TestEvaluate_DynamicMethod(t *testing.T) { t.Errorf("unexpected x.args[1]: got %v exp %v", got, exp) } } + +func TestEvaluate_Vars(t *testing.T) { + script := ` +var x = 3m +var y = -x + +var n = TRUE +var m = !n +` + + scope := tick.NewScope() + err := tick.Evaluate(script, scope) + if err != nil { + t.Fatal(err) + } + + x, err := scope.Get("x") + if err != nil { + t.Fatal(err) + } + if value, ok := x.(time.Duration); ok { + if exp, got := time.Minute*3, value; exp != got { + t.Errorf("unexpected x value: exp %v got %v", exp, got) + } + } else { + t.Errorf("unexpected x value type: exp time.Duration got %T", x) + } + + y, err := scope.Get("y") + if err != nil { + t.Fatal(err) + } + if value, ok := y.(time.Duration); ok { + if exp, got := time.Minute*-3, value; exp != got { + t.Errorf("unexpected y value: exp %v got %v", exp, got) + } + } else { + t.Errorf("unexpected y value type: exp time.Duration got %T", x) + } + + n, err := scope.Get("n") + if err != nil { + t.Fatal(err) + } + if value, ok := n.(bool); ok { + if exp, got := true, value; exp != got { + t.Errorf("unexpected n value: exp %v got %v", exp, got) + } + } else { + t.Errorf("unexpected m value type: exp bool got %T", x) + } + + m, err := scope.Get("m") + if err != nil { + t.Fatal(err) + } + if value, ok := m.(bool); ok { + if exp, got := false, value; exp != got { + t.Errorf("unexpected m value: exp %v got %v", exp, got) + } + } else { + t.Errorf("unexpected m value type: exp bool got %T", x) + } + +} diff --git a/tick/parser_test.go b/tick/parser_test.go index 01efba366..e2c83229c 100644 --- a/tick/parser_test.go +++ b/tick/parser_test.go @@ -364,6 +364,42 @@ func TestParseStatements(t *testing.T) { }, }, }, + { + script: `var x = 3m + var y = -x`, + Root: &ListNode{ + Nodes: []Node{ + &BinaryNode{ + pos: 6, + Operator: TokenAsgn, + Left: &IdentifierNode{ + pos: 4, + Ident: "x", + }, + Right: &DurationNode{ + pos: 8, + Dur: time.Minute * 3, + }, + }, + &BinaryNode{ + pos: 20, + Operator: TokenAsgn, + Left: &IdentifierNode{ + pos: 18, + Ident: "y", + }, + Right: &UnaryNode{ + pos: 22, + Operator: TokenMinus, + Node: &IdentifierNode{ + pos: 23, + Ident: "x", + }, + }, + }, + }, + }, + }, { script: `var t = 42 stream.where(lambda: "value" > t)