Skip to content

Commit de1f5a0

Browse files
authoredAug 4, 2022
Merge pull request kubernetes-retired#314 from gauravkghildiyal/ipvs
Change endpoint key and use anonymous set depending on presence of PodName
2 parents 5d30e85 + e892512 commit de1f5a0

File tree

7 files changed

+118
-76
lines changed

7 files changed

+118
-76
lines changed
 

‎api/localnetv1/services.pb.go

+76-66
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎api/localnetv1/services.proto

+1
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ message EndpointInfo {
162162
string SourceName = 3;
163163
string ServiceName = 4;
164164
string NodeName = 5;
165+
string PodName = 9;
165166

166167
Endpoint Endpoint = 6;
167168
EndpointConditions Conditions = 7;

‎api/localnetv1/services_grpc.pb.go

+4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎client/serviceevents/service-events.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ type SessionAffinityListener interface {
5858
// - AddIPPort
5959
// - DeleteIPPort
6060
// - DeleteIP
61-
//
6261
type ServicesListener struct {
6362
PortsListener PortsListener
6463
IPsListener IPsListener
@@ -72,7 +71,6 @@ type ServicesListener struct {
7271
// New creates a new ServicesListener.
7372
//
7473
// Reminder: you need to associate listeners for this listener to be useful.
75-
//
7674
func New() *ServicesListener {
7775
return &ServicesListener{
7876
services: map[string]*localnetv1.Service{},
@@ -251,7 +249,12 @@ func (sl *ServicesListener) diff(prevSvc, currSvc *localnetv1.Service) {
251249
}
252250

253251
func samePort(p1, p2 *localnetv1.PortMapping) bool {
254-
return p1.Protocol == p2.Protocol && p1.Port == p2.Port
252+
return p1.Name == p2.Name &&
253+
p1.Protocol == p2.Protocol &&
254+
p1.Port == p2.Port &&
255+
p1.NodePort == p2.NodePort &&
256+
p1.TargetPort == p2.TargetPort &&
257+
p1.TargetPortName == p2.TargetPortName
255258
}
256259

257260
// SessionAffinity contains data about assinged session affinity

‎server/jobs/kube2store/endpoints-event-handler.go

+4
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ func (h *endpointsEventHandler) OnAdd(obj interface{}) {
7070
}
7171
}
7272

73+
if t := addr.TargetRef; t != nil && t.Kind == "Pod" {
74+
info.PodName = t.Name
75+
}
76+
7377
if addr.IP != "" {
7478
info.Endpoint.AddAddress(addr.IP)
7579
}

‎server/jobs/kube2store/slice-event-handler.go

+4
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ func (h sliceEventHandler) OnAdd(obj interface{}) {
6060
info.NodeName = sliceEndpoint.Topology[hostNameLabel]
6161
}
6262

63+
if t := sliceEndpoint.TargetRef; t != nil && t.Kind == "Pod" {
64+
info.PodName = t.Name
65+
}
66+
6367
if h := sliceEndpoint.Hostname; h != nil {
6468
info.Endpoint.Hostname = *h
6569
}

‎server/jobs/store2localdiff/store2localdiff.go

+23-7
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func (j *Job) Run(ctx context.Context) error {
4646
Sets: []localnetv1.Set{
4747
localnetv1.Set_ServicesSet,
4848
localnetv1.Set_EndpointsSet,
49+
localnetv1.Set_EndpointsSet, // 2nd endpoints set for endpoints which do not have a corresponding pod name
4950
},
5051
Sink: run,
5152
}
@@ -77,6 +78,7 @@ func (s *jobRun) Update(tx *proxystore.Tx, w *watchstate.WatchState) {
7778

7879
svcs := w.StoreFor(localnetv1.Set_ServicesSet)
7980
seps := w.StoreFor(localnetv1.Set_EndpointsSet)
81+
sepsAnonymous := w.StoreForN(localnetv1.Set_EndpointsSet, 1)
8082

8183
// set all new values
8284
tx.Each(proxystore.Services, func(kv *proxystore.KV) bool {
@@ -94,30 +96,44 @@ func (s *jobRun) Update(tx *proxystore.Tx, w *watchstate.WatchState) {
9496
// hash only the endpoint
9597
hash := serde.Hash(ei.Endpoint)
9698

97-
// key is service key + endpoint hash (64 bits, in hex)
98-
key := append(make([]byte, 0, len(key)+1+64/8*2), key...)
99-
key = append(key, '/')
100-
key = strconv.AppendUint(key, hash, 16)
99+
var epKey []byte
100+
var set *lightdiffstore.DiffStore
101+
102+
if ei.PodName == "" {
103+
set = sepsAnonymous
104+
// key is service key + endpoint hash (64 bits, in hex)
105+
epKey = append(make([]byte, 0, len(key)+1+64/8*2), key...)
106+
epKey = append(epKey, '/')
107+
epKey = strconv.AppendUint(epKey, hash, 16)
108+
} else {
109+
set = seps
110+
// key is service key + podName
111+
epKey = append(make([]byte, 0, len(key)+1+len(ei.PodName)), key...)
112+
epKey = append(epKey, '/')
113+
epKey = append(epKey, []byte(ei.PodName)...)
114+
}
101115

102116
if trace.IsEnabled() {
103-
trace.Log(ctx, "endpoint", string(key))
117+
trace.Log(ctx, "endpoint", string(epKey))
104118
}
105119

106-
seps.Set(key, hash, ei.Endpoint)
120+
set.Set(epKey, hash, ei.Endpoint)
107121
}
108122

109123
return true
110124
})
111125
}
112126

113-
func (_ *jobRun) SendDiff(w *watchstate.WatchState) (updated bool) {
127+
func (*jobRun) SendDiff(w *watchstate.WatchState) (updated bool) {
114128
_, task := trace.NewTask(context.Background(), "LocalState.SendDiff")
115129
defer task.End()
116130

117131
count := 0
118132
count += w.SendUpdates(localnetv1.Set_ServicesSet)
133+
count += w.SendDeletesN(localnetv1.Set_EndpointsSet, 1)
119134
count += w.SendUpdates(localnetv1.Set_EndpointsSet)
120135
count += w.SendDeletes(localnetv1.Set_EndpointsSet)
136+
count += w.SendUpdatesN(localnetv1.Set_EndpointsSet, 1)
121137
count += w.SendDeletes(localnetv1.Set_ServicesSet)
122138

123139
w.Reset(lightdiffstore.ItemDeleted)

0 commit comments

Comments
 (0)
Please sign in to comment.