Skip to content

Commit

Permalink
Add span filtering capability
Browse files Browse the repository at this point in the history
  • Loading branch information
pmm-sumo committed Feb 2, 2021
1 parent 6027312 commit 14009aa
Show file tree
Hide file tree
Showing 9 changed files with 365 additions and 34 deletions.
30 changes: 24 additions & 6 deletions processor/filterprocessor/README.md
Original file line number Diff line number Diff line change
@@ -1,29 +1,34 @@
# Filter Processor

Supported pipeline types: metrics
Supported pipeline types: traces, metrics

The filter processor can be configured to include or exclude metrics based on
metric name in the case of the 'strict' or 'regexp' match types, or based on other
metric attributes in the case of the 'expr' match type. Please refer to
[config.go](./config.go) for the config spec.

It takes a pipeline type, of which only `metrics` is supported, followed by an
action:
It takes a pipeline type, of which only `metrics` and `spans` is supported, where following actions are available:
- `include`: Any names NOT matching filters are excluded from remainder of pipeline
- `exclude`: Any names matching filters are excluded from remainder of pipeline

For the actions the following parameters are required:
For the `metrics` actions the following parameters are required:
- `match_type`: strict|regexp|expr
- `metric_names`: (only for a `match_type` of 'strict' or 'regexp') list of strings or re2 regex patterns
- `expressions`: (only for a `match_type` of 'expr') list of expr expressions (see "Using an 'expr' match_type" below)
- `resource_attributes`: ResourceAttributes defines a list of possible resource attributes to match metrics against. A match occurs if any resource attribute matches at least one expression in this given list.

More details can found at [include/exclude metrics](../README.md#includeexclude-metrics).
For `spans`, following parameters are available:
- `match_type`: strict|regexp|expr
- `services`: list of strings for matching service names
- `span_names`: list of strings for matching span names
- `attributes`: list of attributes to match against

More details can found at [include/exclude](../README.md#includeexclude-metrics).

Examples:

```yaml
processors:
processors:
filter/1:
metrics:
include:
Expand All @@ -39,6 +44,19 @@ processors:
metric_names:
- hello_world
- hello/world
filter/2:
spans:
include:
match_type: regexp
span_names:
# re2 regexp patterns
- ^prefix/.*
- .*/suffix
exclude:
match_type: regexp
span_names:
- ^other_prefix/.*
- .*/other_suffix
```
Refer to the config files in [testdata](./testdata) for detailed
Expand Down
15 changes: 15 additions & 0 deletions processor/filterprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,28 @@ package filterprocessor

import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/internal/processor/filterconfig"
"go.opentelemetry.io/collector/internal/processor/filtermetric"
)

// Config defines configuration for Resource processor.
type Config struct {
configmodels.ProcessorSettings `mapstructure:",squash"`
Metrics MetricFilters `mapstructure:"metrics"`
Spans SpanFilters `mapstructure:"spans"`
}

// SpanFilters filters by Span properties
type SpanFilters struct {
// Include match properties describe spans that should be included in the Collector Service pipeline,
// all other spans should be dropped from further processing.
// If both Include and Exclude are specified, Include filtering occurs first.
Include *filterconfig.MatchProperties `mapstructure:"include"`

// Exclude match properties describe spans that should be excluded from the Collector Service pipeline,
// all other spans should be included.
// If both Include and Exclude are specified, Include filtering occurs first.
Exclude *filterconfig.MatchProperties `mapstructure:"exclude"`
}

// MetricFilter filters by Metric properties.
Expand Down
44 changes: 44 additions & 0 deletions processor/filterprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"path"
"testing"

"go.opentelemetry.io/collector/internal/processor/filterconfig"
"go.opentelemetry.io/collector/internal/processor/filterset"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -311,3 +314,44 @@ func TestLoadingConfigExpr(t *testing.T) {
})
}
}

func TestLoadingConfigSpans(t *testing.T) {
factories, err := componenttest.ExampleComponents()
require.NoError(t, err)
factory := NewFactory()
factories.Processors[configmodels.Type(typeStr)] = factory
config, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config_spans.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, config)

tests := []struct {
filterName string
expCfg configmodels.Processor
}{
{
filterName: "filter/spans",
expCfg: &Config{
ProcessorSettings: configmodels.ProcessorSettings{
NameVal: "filter/spans",
TypeVal: typeStr,
},
Spans: SpanFilters{
Include: &filterconfig.MatchProperties{
Config: filterset.Config{MatchType: "regexp"},
SpanNames: []string{"prefix/.*", ".*/suffix"},
},
Exclude: &filterconfig.MatchProperties{
Config: filterset.Config{MatchType: "regexp"},
SpanNames: []string{"other_prefix/.*", ".*/other_suffix"},
},
},
},
},
}
for _, test := range tests {
t.Run(test.filterName, func(t *testing.T) {
cfg := config.Processors[test.filterName]
assert.Equal(t, test.expCfg, cfg)
})
}
}
18 changes: 18 additions & 0 deletions processor/filterprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func NewFactory() component.ProcessorFactory {
return processorhelper.NewFactory(
typeStr,
createDefaultConfig,
processorhelper.WithTraces(createTracesProcessor),
processorhelper.WithMetrics(createMetricsProcessor))
}

Expand Down Expand Up @@ -63,3 +64,20 @@ func createMetricsProcessor(
fp,
processorhelper.WithCapabilities(processorCapabilities))
}

func createTracesProcessor(
_ context.Context,
params component.ProcessorCreateParams,
cfg configmodels.Processor,
nextConsumer consumer.TracesConsumer,
) (component.TracesProcessor, error) {
fp, err := newFilterSpanProcessor(params.Logger, cfg.(*Config))
if err != nil {
return nil, err
}
return processorhelper.NewTraceProcessor(
cfg,
nextConsumer,
fp,
processorhelper.WithCapabilities(processorCapabilities))
}
5 changes: 2 additions & 3 deletions processor/filterprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,8 @@ func TestCreateProcessors(t *testing.T) {
factory := NewFactory()

tp, tErr := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, consumertest.NewTracesNop())
// Not implemented error
assert.NotNil(t, tErr)
assert.Nil(t, tp)
assert.Equal(t, test.succeed, tp != nil)
assert.Equal(t, test.succeed, tErr == nil)

mp, mErr := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, consumertest.NewMetricsNop())
assert.Equal(t, test.succeed, mp != nil)
Expand Down
Loading

0 comments on commit 14009aa

Please sign in to comment.