-
Notifications
You must be signed in to change notification settings - Fork 125
/
Copy pathkafka_channel_lifecycle.go
149 lines (115 loc) · 5.86 KB
/
kafka_channel_lifecycle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1beta1
import (
"sync"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
)
// The consolidated and distributed KafkaChannel implementations require
// differentiated ConditionSets in order to accurately reflect their varied
// runtime architectures. One of the channel specific "Register..." functions
// in pkg/channel/<type>/apis/messaging/kafka_channel_lifecycle.go should be
// called via an init() in the main() of associated components.
var kc apis.ConditionSet
var channelCondSetLock = sync.RWMutex{}
// Shared / Common Conditions Used By All Channel Implementations
const (
// KafkaChannelConditionReady has status True when all sub-conditions below have been set to True.
KafkaChannelConditionReady = apis.ConditionReady
// KafkaChannelConditionAddressable has status true when this KafkaChannel meets
// the Addressable contract and has a non-empty URL.
KafkaChannelConditionAddressable apis.ConditionType = "Addressable"
// KafkaChannelConditionConfigReady has status True when the Kafka configuration to use by the channel
// exists and is valid (i.e. the connection has been established).
KafkaChannelConditionConfigReady apis.ConditionType = "ConfigurationReady"
// KafkaChannelConditionTopicReady has status True when the Kafka topic to use by the channel exists.
KafkaChannelConditionTopicReady apis.ConditionType = "TopicReady"
// KafkaChannelConditionChannelServiceReady has status True when the K8S Service representing the channel
// is ready. Because this uses ExternalName, there are no endpoints to check.
KafkaChannelConditionChannelServiceReady apis.ConditionType = "ChannelServiceReady"
ConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady"
)
// RegisterAlternateKafkaChannelConditionSet register a different apis.ConditionSet.
func RegisterAlternateKafkaChannelConditionSet(conditionSet apis.ConditionSet) {
channelCondSetLock.Lock()
defer channelCondSetLock.Unlock()
kc = conditionSet
}
// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface.
func (*KafkaChannel) GetConditionSet() apis.ConditionSet {
channelCondSetLock.RLock()
defer channelCondSetLock.RUnlock()
return kc
}
// GetConditionSet retrieves the condition set for this resource.
func (*KafkaChannelStatus) GetConditionSet() apis.ConditionSet {
channelCondSetLock.RLock()
defer channelCondSetLock.RUnlock()
return kc
}
// GetCondition returns the condition currently associated with the given type, or nil.
func (kcs *KafkaChannelStatus) GetCondition(t apis.ConditionType) *apis.Condition {
return kcs.GetConditionSet().Manage(kcs).GetCondition(t)
}
// IsReady returns true if the resource is ready overall.
func (kcs *KafkaChannelStatus) IsReady() bool {
return kcs.GetConditionSet().Manage(kcs).IsHappy()
}
// InitializeConditions sets relevant unset conditions to Unknown state.
func (kcs *KafkaChannelStatus) InitializeConditions() {
kcs.GetConditionSet().Manage(kcs).InitializeConditions()
}
// SetAddress sets the address (as part of Addressable contract) and marks the correct condition.
func (kcs *KafkaChannelStatus) SetAddress(addr *duckv1.Addressable) {
if kcs.Address == nil {
kcs.Address = &duckv1.Addressable{}
}
if addr != nil {
kcs.Address = addr
kcs.GetConditionSet().Manage(kcs).MarkTrue(KafkaChannelConditionAddressable)
} else {
kcs.Address.URL = nil
kcs.GetConditionSet().Manage(kcs).MarkFalse(KafkaChannelConditionAddressable, "EmptyURL", "URL is nil")
}
}
func (kcs *KafkaChannelStatus) MarkConfigTrue() {
kcs.GetConditionSet().Manage(kcs).MarkTrue(KafkaChannelConditionConfigReady)
}
func (kcs *KafkaChannelStatus) MarkConfigFailed(reason, messageFormat string, messageA ...interface{}) {
kcs.GetConditionSet().Manage(kcs).MarkFalse(KafkaChannelConditionConfigReady, reason, messageFormat, messageA...)
}
func (kcs *KafkaChannelStatus) MarkTopicTrue() {
kcs.GetConditionSet().Manage(kcs).MarkTrue(KafkaChannelConditionTopicReady)
}
func (kcs *KafkaChannelStatus) MarkTopicFailed(reason, messageFormat string, messageA ...interface{}) {
kcs.GetConditionSet().Manage(kcs).MarkFalse(KafkaChannelConditionTopicReady, reason, messageFormat, messageA...)
}
func (kcs *KafkaChannelStatus) MarkChannelServiceFailed(reason, messageFormat string, messageA ...interface{}) {
kcs.GetConditionSet().Manage(kcs).MarkFalse(KafkaChannelConditionChannelServiceReady, reason, messageFormat, messageA...)
}
func (kcs *KafkaChannelStatus) MarkChannelServiceTrue() {
kcs.GetConditionSet().Manage(kcs).MarkTrue(KafkaChannelConditionChannelServiceReady)
}
func (kcs *KafkaChannelStatus) MarkEventPoliciesTrue() {
kcs.GetConditionSet().Manage(kcs).MarkTrue(ConditionEventPoliciesReady)
}
func (kcs *KafkaChannelStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) {
kcs.GetConditionSet().Manage(kcs).MarkTrueWithReason(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
}
func (kcs *KafkaChannelStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) {
kcs.GetConditionSet().Manage(kcs).MarkFalse(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
}
func (kcs *KafkaChannelStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) {
kcs.GetConditionSet().Manage(kcs).MarkUnknown(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
}