Skip to content

Commit 241e6a7

Browse files
authored
Support for EventPolicy filters (#4086)
* Move filters from dispatcher to core module * Move contract filter converter from ConsumerVerticleBuilder to filter package * Update benchmark module to reflect filter package movements * Run update-codegen * AuthenticationHandler -> AuthHandler * Move OIDC discovery from Receiver to TokenVerifierImpl * Rework tokenVerifier AuthZ to read the cloudevent only once from the request when policies claims matched * Rename TokenVerifier to AuthVerifier * Provision EventPolicy filters in contract * Add e2e test * Run hack/update-codegen.sh * Fix: Always pass cloudevent (can be null) to ingresshandler
1 parent 268a574 commit 241e6a7

File tree

55 files changed

+977
-463
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+977
-463
lines changed

control-plane/pkg/core/config/utils.go

+4
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ func EventPoliciesFromAppliedEventPoliciesStatus(status duck.AppliedEventPolicie
9090
}
9191
}
9292

93+
for _, filter := range policy.Spec.Filters {
94+
contractPolicy.Filters = append(contractPolicy.Filters, contract.FromSubscriptionFilter(filter))
95+
}
96+
9397
eventPolicies = append(eventPolicies, contractPolicy)
9498
}
9599

control-plane/pkg/core/config/utils_test.go

+76
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"testing"
2424
"time"
2525

26+
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
27+
2628
"google.golang.org/protobuf/encoding/protojson"
2729
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
2830
"knative.dev/eventing/pkg/apis/feature"
@@ -618,6 +620,80 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
618620
},
619621
},
620622
},
623+
}, {
624+
name: "Multiple policies with filters",
625+
applyingPolicies: []string{
626+
"policy-1",
627+
"policy-2",
628+
},
629+
existingEventPolicies: []*eventingv1alpha1.EventPolicy{
630+
{
631+
ObjectMeta: metav1.ObjectMeta{
632+
Name: "policy-1",
633+
Namespace: "my-ns",
634+
},
635+
Spec: eventingv1alpha1.EventPolicySpec{
636+
Filters: []eventingv1.SubscriptionsAPIFilter{
637+
{
638+
CESQL: "true",
639+
},
640+
},
641+
},
642+
Status: eventingv1alpha1.EventPolicyStatus{
643+
From: []string{
644+
"from-1",
645+
},
646+
},
647+
}, {
648+
ObjectMeta: metav1.ObjectMeta{
649+
Name: "policy-2",
650+
Namespace: "my-ns",
651+
},
652+
Spec: eventingv1alpha1.EventPolicySpec{
653+
Filters: []eventingv1.SubscriptionsAPIFilter{
654+
{
655+
CESQL: "false",
656+
},
657+
},
658+
},
659+
Status: eventingv1alpha1.EventPolicyStatus{
660+
From: []string{
661+
"from-2-*",
662+
},
663+
},
664+
},
665+
},
666+
namespace: "my-ns",
667+
defaultAuthorizationMode: feature.AuthorizationDenyAll,
668+
expected: []*contract.EventPolicy{
669+
{
670+
TokenMatchers: []*contract.TokenMatcher{
671+
exactTokenMatcher("from-1"),
672+
},
673+
Filters: []*contract.DialectedFilter{
674+
{
675+
Filter: &contract.DialectedFilter_Cesql{
676+
Cesql: &contract.CESQL{
677+
Expression: "true",
678+
},
679+
},
680+
},
681+
},
682+
}, {
683+
TokenMatchers: []*contract.TokenMatcher{
684+
prefixTokenMatcher("from-2-"),
685+
},
686+
Filters: []*contract.DialectedFilter{
687+
{
688+
Filter: &contract.DialectedFilter_Cesql{
689+
Cesql: &contract.CESQL{
690+
Expression: "false",
691+
},
692+
},
693+
},
694+
},
695+
},
696+
},
621697
}, {
622698
name: "No applying policies - allow-same-namespace default mode",
623699
applyingPolicies: []string{},

data-plane/benchmarks/pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@
5454
<artifactId>dispatcher</artifactId>
5555
<version>${project.version}</version>
5656
</dependency>
57+
<dependency>
58+
<groupId>dev.knative.eventing.kafka.broker</groupId>
59+
<artifactId>core</artifactId>
60+
<version>${project.version}</version>
61+
</dependency>
62+
5763
</dependencies>
5864

5965
<build>
+5-6
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,12 @@
1414
* limitations under the License.
1515
*/
1616

17-
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
17+
package dev.knative.eventing.kafka.broker.core.filter;
1818

19-
import dev.knative.eventing.kafka.broker.dispatcher.Filter;
20-
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.AllFilter;
21-
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.ExactFilter;
22-
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.PrefixFilter;
23-
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.SuffixFilter;
19+
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.AllFilter;
20+
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.ExactFilter;
21+
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.PrefixFilter;
22+
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.SuffixFilter;
2423
import io.cloudevents.CloudEvent;
2524
import java.util.List;
2625
import java.util.Map;
+5-3
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414
* limitations under the License.
1515
*/
1616

17-
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
17+
package dev.knative.eventing.kafka.broker.core.filter;
1818

19-
import dev.knative.eventing.kafka.broker.dispatcher.Filter;
20-
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.*;
19+
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.AnyFilter;
20+
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.ExactFilter;
21+
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.PrefixFilter;
22+
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.SuffixFilter;
2123
import io.cloudevents.CloudEvent;
2224
import java.util.List;
2325
import java.util.Map;
+2-3
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,9 @@
1414
* limitations under the License.
1515
*/
1616

17-
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
17+
package dev.knative.eventing.kafka.broker.core.filter;
1818

19-
import dev.knative.eventing.kafka.broker.dispatcher.Filter;
20-
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.ExactFilter;
19+
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.ExactFilter;
2120
import io.cloudevents.CloudEvent;
2221
import io.cloudevents.core.builder.CloudEventBuilder;
2322
import io.cloudevents.core.v1.CloudEventV1;
+1-2
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,8 @@
1414
* limitations under the License.
1515
*/
1616

17-
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
17+
package dev.knative.eventing.kafka.broker.core.filter;
1818

19-
import dev.knative.eventing.kafka.broker.dispatcher.Filter;
2019
import io.cloudevents.CloudEvent;
2120
import java.util.concurrent.TimeUnit;
2221
import org.openjdk.jmh.annotations.*;
+5-6
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,12 @@
1414
* limitations under the License.
1515
*/
1616

17-
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
17+
package dev.knative.eventing.kafka.broker.core.filter;
1818

19-
import dev.knative.eventing.kafka.broker.dispatcher.Filter;
20-
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.ExactFilter;
21-
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.NotFilter;
22-
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.PrefixFilter;
23-
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.SuffixFilter;
19+
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.ExactFilter;
20+
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.NotFilter;
21+
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.PrefixFilter;
22+
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.SuffixFilter;
2423
import io.cloudevents.CloudEvent;
2524
import io.cloudevents.core.v1.CloudEventV1;
2625
import java.util.Map;
+2-3
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,9 @@
1414
* limitations under the License.
1515
*/
1616

17-
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
17+
package dev.knative.eventing.kafka.broker.core.filter;
1818

19-
import dev.knative.eventing.kafka.broker.dispatcher.Filter;
20-
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.PrefixFilter;
19+
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.PrefixFilter;
2120
import io.cloudevents.CloudEvent;
2221
import io.cloudevents.core.v1.CloudEventV1;
2322
import java.util.Map;

data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/SampleEvent.java data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/SampleEvent.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
17+
package dev.knative.eventing.kafka.broker.core.filter;
1818

1919
import io.cloudevents.CloudEvent;
2020
import io.cloudevents.core.builder.CloudEventBuilder;
+2-3
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,9 @@
1414
* limitations under the License.
1515
*/
1616

17-
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
17+
package dev.knative.eventing.kafka.broker.core.filter;
1818

19-
import dev.knative.eventing.kafka.broker.dispatcher.Filter;
20-
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.SuffixFilter;
19+
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.SuffixFilter;
2120
import io.cloudevents.CloudEvent;
2221
import io.cloudevents.core.v1.CloudEventV1;
2322
import java.util.Map;

data-plane/core/pom.xml

+4
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@
146146
<artifactId>vertx-opentelemetry</artifactId>
147147
</dependency>
148148

149+
<dependency>
150+
<groupId>io.cloudevents</groupId>
151+
<artifactId>cloudevents-sql</artifactId>
152+
</dependency>
149153
<dependency>
150154
<groupId>io.cloudevents</groupId>
151155
<artifactId>cloudevents-kafka</artifactId>
+1-2
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,10 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
16+
package dev.knative.eventing.kafka.broker.core.filter;
1717

1818
import static java.time.format.DateTimeFormatter.ISO_INSTANT;
1919

20-
import dev.knative.eventing.kafka.broker.dispatcher.Filter;
2120
import io.cloudevents.CloudEvent;
2221
import io.cloudevents.core.v03.CloudEventV03;
2322
import io.cloudevents.core.v1.CloudEventV1;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright © 2018 Knative Authors ([email protected])
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package dev.knative.eventing.kafka.broker.core.filter;
17+
18+
import dev.knative.eventing.kafka.broker.contract.DataPlaneContract;
19+
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.*;
20+
import io.cloudevents.CloudEvent;
21+
import java.util.List;
22+
import java.util.function.Predicate;
23+
import java.util.stream.Collectors;
24+
25+
/**
26+
* This interface provides an abstraction for filtering {@link CloudEvent} instances.
27+
*/
28+
@FunctionalInterface
29+
public interface Filter extends Predicate<CloudEvent> {
30+
31+
/**
32+
* @return noop implementation that always returns true
33+
*/
34+
static Filter noop() {
35+
return ce -> true;
36+
}
37+
38+
static Filter fromContract(DataPlaneContract.DialectedFilter filter) {
39+
return switch (filter.getFilterCase()) {
40+
case EXACT -> new ExactFilter(filter.getExact().getAttributesMap());
41+
case PREFIX -> new PrefixFilter(filter.getPrefix().getAttributesMap());
42+
case SUFFIX -> new SuffixFilter(filter.getSuffix().getAttributesMap());
43+
case NOT -> new NotFilter(fromContract(filter.getNot().getFilter()));
44+
case ANY -> new AnyFilter(filter.getAny().getFiltersList().stream()
45+
.map(Filter::fromContract)
46+
.collect(Collectors.toList()));
47+
case ALL -> new AllFilter(filter.getAll().getFiltersList().stream()
48+
.map(Filter::fromContract)
49+
.collect(Collectors.toList()));
50+
case CESQL -> new CeSqlFilter(filter.getCesql().getExpression());
51+
default -> Filter.noop();
52+
};
53+
}
54+
55+
static Filter fromContract(List<DataPlaneContract.DialectedFilter> filters) {
56+
return new AllFilter(filters.stream().map(Filter::fromContract).collect(Collectors.toList()));
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
16+
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;
1717

18-
import dev.knative.eventing.kafka.broker.dispatcher.Filter;
18+
import dev.knative.eventing.kafka.broker.core.filter.Filter;
1919
import io.cloudevents.CloudEvent;
2020
import java.util.List;
2121
import org.slf4j.Logger;
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
16+
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;
1717

18-
import dev.knative.eventing.kafka.broker.dispatcher.Filter;
18+
import dev.knative.eventing.kafka.broker.core.filter.Filter;
1919
import io.cloudevents.CloudEvent;
2020
import java.util.List;
2121
import org.slf4j.Logger;
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
16+
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;
1717

18-
import dev.knative.eventing.kafka.broker.dispatcher.Filter;
18+
import dev.knative.eventing.kafka.broker.core.filter.Filter;
1919
import io.cloudevents.CloudEvent;
2020
import io.cloudevents.sql.EvaluationContext;
2121
import io.cloudevents.sql.EvaluationException;
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
16+
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;
1717

18-
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.AttributesFilter;
18+
import dev.knative.eventing.kafka.broker.core.filter.AttributesFilter;
1919
import java.util.Map;
2020

2121
public class ExactFilter extends AttributesFilter {
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
16+
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;
1717

18-
import dev.knative.eventing.kafka.broker.dispatcher.Filter;
18+
import dev.knative.eventing.kafka.broker.core.filter.Filter;
1919
import io.cloudevents.CloudEvent;
2020
import org.slf4j.Logger;
2121
import org.slf4j.LoggerFactory;
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
16+
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;
1717

18-
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.AttributesFilter;
18+
import dev.knative.eventing.kafka.broker.core.filter.AttributesFilter;
1919
import java.util.Map;
2020

2121
public class PrefixFilter extends AttributesFilter {
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
16+
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;
1717

18-
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.AttributesFilter;
18+
import dev.knative.eventing.kafka.broker.core.filter.AttributesFilter;
1919
import java.util.Map;
2020

2121
public class SuffixFilter extends AttributesFilter {

0 commit comments

Comments
 (0)