Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alerta support for timeout, tags and service template #1545

Merged
merged 1 commit into from
Sep 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
The breaking change is that the Combine and Flatten nodes previously, but erroneously, operated across batch boundaries; this has been fixed.
- [#1497](https://github.com/influxdata/kapacitor/pull/1497): Add support for Docker Swarm autoscaling services.
- [#1485](https://github.com/influxdata/kapacitor/issues/1485): Add bools field types to UDFs.
- [#1545](https://github.com/influxdata/kapacitor/pull/1545): Add support for timeout, tags and service template in the Alerta AlertNode

### Bugfixes

Expand Down
3 changes: 3 additions & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
if len(a.Service) != 0 {
c.Service = a.Service
}
if a.Timeout != 0 {
c.Timeout = a.Timeout
}
h, err := et.tm.AlertaService.Handler(c, l)
if err != nil {
return nil, errors.Wrap(err, "failed to create Alerta handler")
Expand Down
7 changes: 5 additions & 2 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7616,6 +7616,7 @@ stream
.alerta()
.token('testtoken1234567')
.environment('production')
.timeout(1h)
.alerta()
.token('anothertesttoken')
.resource('resource: {{ index .Tags "host" }}')
Expand All @@ -7624,7 +7625,7 @@ stream
.origin('override')
.group('{{ .ID }}')
.value('{{ index .Fields "count" }}')
.services('serviceA', 'serviceB')
.services('serviceA', 'serviceB', '{{ .Name }}')
`
tmInit := func(tm *kapacitor.TaskMaster) {
c := alerta.NewConfig()
Expand All @@ -7648,6 +7649,7 @@ stream
Text: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC",
Origin: "Kapacitor",
Service: []string{"cpu"},
Timeout: 3600,
},
},
alertatest.Request{
Expand All @@ -7660,8 +7662,9 @@ stream
Environment: "serverA",
Text: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC",
Origin: "override",
Service: []string{"serviceA", "serviceB"},
Service: []string{"serviceA", "serviceB", "cpu"},
Value: "10",
Timeout: 86400,
},
},
}
Expand Down
5 changes: 5 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,7 @@ type HipChatHandler struct {
// .event('Something went wrong')
// .environment('Development')
// .group('Dev. Servers')
// .timeout(5m)
//
// NOTE: Alerta cannot be configured globally because of its required properties.
// tick:property
Expand Down Expand Up @@ -996,6 +997,10 @@ type AlertaHandler struct {
// List of effected Services
// tick:ignore
Service []string `tick:"Services"`

// Alerta timeout.
// Default: 24h
Timeout time.Duration
}

// List of effected services.
Expand Down
12 changes: 10 additions & 2 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5918,6 +5918,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"token-prefix": "",
"url": "http://alerta.example.com",
"insecure-skip-verify": false,
"timeout": "0s",
},
Redacted: []string{
"token",
Expand All @@ -5934,6 +5935,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"token-prefix": "",
"url": "http://alerta.example.com",
"insecure-skip-verify": false,
"timeout": "0s",
},
Redacted: []string{
"token",
Expand All @@ -5943,8 +5945,9 @@ func TestServer_UpdateConfig(t *testing.T) {
{
updateAction: client.ConfigUpdateAction{
Set: map[string]interface{}{
"token": "token",
"origin": "kapacitor",
"token": "token",
"origin": "kapacitor",
"timeout": "3h",
},
},
expSection: client.ConfigSection{
Expand All @@ -5959,6 +5962,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"token-prefix": "",
"url": "http://alerta.example.com",
"insecure-skip-verify": false,
"timeout": "3h0m0s",
},
Redacted: []string{
"token",
Expand All @@ -5975,6 +5979,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"token-prefix": "",
"url": "http://alerta.example.com",
"insecure-skip-verify": false,
"timeout": "3h0m0s",
},
Redacted: []string{
"token",
Expand Down Expand Up @@ -7352,6 +7357,7 @@ func TestServer_ListServiceTests(t *testing.T) {
"testServiceA",
"testServiceB",
},
"timeout": "24h0m0s",
},
},
{
Expand Down Expand Up @@ -8066,6 +8072,7 @@ func TestServer_AlertHandlers(t *testing.T) {
"origin": "kapacitor",
"group": "test",
"environment": "env",
"timeout": time.Duration(24 * time.Hour),
},
},
setup: func(c *server.Config, ha *client.TopicHandler) (context.Context, error) {
Expand All @@ -8091,6 +8098,7 @@ func TestServer_AlertHandlers(t *testing.T) {
Text: "message",
Origin: "kapacitor",
Service: []string{"alert"},
Timeout: 86400,
},
}}
if !reflect.DeepEqual(exp, got) {
Expand Down
1 change: 1 addition & 0 deletions services/alerta/alertatest/alertatest.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,5 @@ type PostData struct {
Origin string `json:"origin"`
Service []string `json:"service"`
Value string `json:"value"`
Timeout int64 `json:"timeout"`
}
3 changes: 3 additions & 0 deletions services/alerta/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package alerta
import (
"net/url"

"github.com/influxdata/influxdb/toml"
"github.com/pkg/errors"
)

Expand All @@ -22,6 +23,8 @@ type Config struct {
Environment string `toml:"environment" override:"environment"`
// The origin of the alert.
Origin string `toml:"origin" override:"origin"`
// Optional timeout, can be overridden per alert.
Timeout toml.Duration `toml:"timeout" override:"timeout"`
}

func NewConfig() Config {
Expand Down
54 changes: 49 additions & 5 deletions services/alerta/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"path"
"sync/atomic"
text "text/template"
"time"

"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/models"
Expand All @@ -22,6 +23,7 @@ const (
defaultResource = "{{ .Name }}"
defaultEvent = "{{ .ID }}"
defaultGroup = "{{ .Group }}"
defaultTimeout = time.Duration(24 * time.Hour)
defaultTokenPrefix = "Bearer"
)

Expand Down Expand Up @@ -55,6 +57,7 @@ type testOptions struct {
Message string `json:"message"`
Origin string `json:"origin"`
Service []string `json:"service"`
Timeout string `json:"timeout"`
}

func (s *Service) TestOptions() interface{} {
Expand All @@ -69,6 +72,7 @@ func (s *Service) TestOptions() interface{} {
Message: "test alerta message",
Origin: c.Origin,
Service: []string{"testServiceA", "testServiceB"},
Timeout: "24h0m0s",
}
}

Expand All @@ -78,6 +82,7 @@ func (s *Service) Test(options interface{}) error {
return fmt.Errorf("unexpected options type %T", options)
}
c := s.config()
timeout, _ := time.ParseDuration(o.Timeout)
return s.Alert(
c.Token,
c.TokenPrefix,
Expand All @@ -90,6 +95,8 @@ func (s *Service) Test(options interface{}) error {
o.Message,
o.Origin,
o.Service,
timeout,
map[string]string{},
models.Result{},
)
}
Expand Down Expand Up @@ -125,12 +132,12 @@ func (s *Service) Update(newConfig []interface{}) error {
return nil
}

func (s *Service) Alert(token, tokenPrefix, resource, event, environment, severity, group, value, message, origin string, service []string, data models.Result) error {
func (s *Service) Alert(token, tokenPrefix, resource, event, environment, severity, group, value, message, origin string, service []string, timeout time.Duration, tags map[string]string, data models.Result) error {
if resource == "" || event == "" {
return errors.New("Resource and Event are required to send an alert")
}

req, err := s.preparePost(token, tokenPrefix, resource, event, environment, severity, group, value, message, origin, service, data)
req, err := s.preparePost(token, tokenPrefix, resource, event, environment, severity, group, value, message, origin, service, timeout, tags, data)
if err != nil {
return err
}
Expand Down Expand Up @@ -158,7 +165,7 @@ func (s *Service) Alert(token, tokenPrefix, resource, event, environment, severi
return nil
}

func (s *Service) preparePost(token, tokenPrefix, resource, event, environment, severity, group, value, message, origin string, service []string, data models.Result) (*http.Request, error) {
func (s *Service) preparePost(token, tokenPrefix, resource, event, environment, severity, group, value, message, origin string, service []string, timeout time.Duration, tags map[string]string, data models.Result) (*http.Request, error) {
c := s.config()

if !c.Enabled {
Expand Down Expand Up @@ -204,6 +211,13 @@ func (s *Service) preparePost(token, tokenPrefix, resource, event, environment,
if len(service) > 0 {
postData["service"] = service
}
postData["timeout"] = int64(timeout / time.Second)

tagList := make([]string, 0)
for k, v := range tags {
tagList = append(tagList, fmt.Sprintf("%s=%s", k, v))
}
postData["tags"] = tagList

var post bytes.Buffer
enc := json.NewEncoder(&post)
Expand Down Expand Up @@ -262,6 +276,10 @@ type HandlerConfig struct {

// List of effected Services
Service []string `mapstructure:"service"`

// Alerta timeout.
// Default: 24h
Timeout time.Duration `mapstructure:"timeout"`
}

type handler struct {
Expand All @@ -274,13 +292,15 @@ type handler struct {
environmentTmpl *text.Template
valueTmpl *text.Template
groupTmpl *text.Template
serviceTmpl []*text.Template
}

func (s *Service) DefaultHandlerConfig() HandlerConfig {
return HandlerConfig{
Resource: defaultResource,
Event: defaultEvent,
Group: defaultGroup,
Timeout: defaultTimeout,
}
}

Expand All @@ -306,6 +326,16 @@ func (s *Service) Handler(c HandlerConfig, l *log.Logger) (alert.Handler, error)
if err != nil {
return nil, err
}

var stmpl []*text.Template
for _, service := range c.Service {
tmpl, err := text.New("service").Parse(service)
if err != nil {
return nil, err
}
stmpl = append(stmpl, tmpl)
}

return &handler{
s: s,
c: c,
Expand All @@ -315,6 +345,7 @@ func (s *Service) Handler(c HandlerConfig, l *log.Logger) (alert.Handler, error)
environmentTmpl: etmpl,
groupTmpl: gtmpl,
valueTmpl: vtmpl,
serviceTmpl: stmpl,
}, nil
}

Expand Down Expand Up @@ -382,10 +413,21 @@ func (h *handler) Handle(event alert.Event) {
return
}
value := buf.String()
buf.Reset()

service := h.c.Service
if len(service) == 0 {
var service []string
if len(h.serviceTmpl) == 0 {
service = []string{td.Name}
} else {
for _, tmpl := range h.serviceTmpl {
err = tmpl.Execute(&buf, td)
if err != nil {
h.logger.Printf("E! failed to evaluate Alerta Service template: %v", err)
return
}
service = append(service, buf.String())
buf.Reset()
}
}

var severity string
Expand Down Expand Up @@ -415,6 +457,8 @@ func (h *handler) Handle(event alert.Event) {
event.State.Message,
h.c.Origin,
service,
h.c.Timeout,
event.Data.Tags,
event.Data.Result,
); err != nil {
h.logger.Printf("E! failed to send event to Alerta: %v", err)
Expand Down