Skip to content

Commit

Permalink
Add keyCtx to ingestion layer
Browse files Browse the repository at this point in the history
  • Loading branch information
ashishnayak96 committed Nov 12, 2024
1 parent afa4f8c commit 097d30d
Show file tree
Hide file tree
Showing 33 changed files with 178 additions and 216 deletions.
26 changes: 16 additions & 10 deletions ako-gateway-api/k8s/ako_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"k8s.io/client-go/tools/cache"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"

"github.com/google/uuid"
"github.com/vmware/alb-sdk/go/models"
akogatewayapilib "github.com/vmware/load-balancer-and-ingress-services-for-kubernetes/ako-gateway-api/lib"
akogatewayapinodes "github.com/vmware/load-balancer-and-ingress-services-for-kubernetes/ako-gateway-api/nodes"
akogatewayapistatus "github.com/vmware/load-balancer-and-ingress-services-for-kubernetes/ako-gateway-api/status"
Expand Down Expand Up @@ -230,7 +230,9 @@ func (c *GatewayController) addIndexers() {
}

func (c *GatewayController) FullSyncK8s(sync bool) error {

// special case of boot up
ctx := context.WithValue(context.Background(), models.TraceID, "BOOT_UP")
keyCtx := akogatewayapilib.KeyContext{Ctx: ctx}
if c.DisableSync {
utils.AviLog.Infof("Sync disabled, skipping full sync")
return nil
Expand All @@ -245,8 +247,7 @@ func (c *GatewayController) FullSyncK8s(sync bool) error {

var filteredGatewayClasses []*gatewayv1.GatewayClass
for _, gwClassObj := range gwClassObjs {
UUID := uuid.New()
key := lib.GatewayClass + "/" + utils.ObjKey(gwClassObj) + "-" + UUID.String()
key := lib.GatewayClass + "/" + utils.ObjKey(gwClassObj)
meta, err := meta.Accessor(gwClassObj)
if err == nil {
resVer := meta.GetResourceVersion()
Expand All @@ -258,7 +259,8 @@ func (c *GatewayController) FullSyncK8s(sync bool) error {
}
for _, filteredGatewayClass := range filteredGatewayClasses {
key := lib.GatewayClass + "/" + utils.ObjKey(filteredGatewayClass)
akogatewayapinodes.DequeueIngestion(key, true)
keyCtx.KeyStr = key
akogatewayapinodes.DequeueIngestion(keyCtx, true)
}

// Gateway Section
Expand Down Expand Up @@ -288,7 +290,8 @@ func (c *GatewayController) FullSyncK8s(sync bool) error {
})
for _, filteredGateway := range filteredGateways {
key := lib.Gateway + "/" + utils.ObjKey(filteredGateway)
akogatewayapinodes.DequeueIngestion(key, true)
keyCtx.KeyStr = key
akogatewayapinodes.DequeueIngestion(keyCtx, true)
}

// HTTPRoute Section
Expand Down Expand Up @@ -318,7 +321,8 @@ func (c *GatewayController) FullSyncK8s(sync bool) error {
})
for _, filteredHTTPRoute := range filteredHTTPRoutes {
key := lib.HTTPRoute + "/" + utils.ObjKey(filteredHTTPRoute)
akogatewayapinodes.DequeueIngestion(key, true)
keyCtx.KeyStr = key
akogatewayapinodes.DequeueIngestion(keyCtx, true)
}

// Service Section
Expand Down Expand Up @@ -347,6 +351,7 @@ func (c *GatewayController) FullSyncK8s(sync bool) error {
for _, podObj := range podObjs {
podLabel := utils.ObjKey(podObj)
key := utils.Pod + "/" + podLabel
keyCtx.KeyStr = key
if _, ok := podObj.GetAnnotations()[lib.NPLPodAnnotation]; !ok {
utils.AviLog.Warnf("key : %s, msg: 'nodeportlocal.antrea.io' annotation not found, ignoring the pod", key)
continue
Expand All @@ -356,7 +361,7 @@ func (c *GatewayController) FullSyncK8s(sync bool) error {
resVer := meta.GetResourceVersion()
objects.SharedResourceVerInstanceLister().Save(key, resVer)
}
akogatewayapinodes.DequeueIngestion(key, true)
akogatewayapinodes.DequeueIngestion(keyCtx, true)
}
}

Expand Down Expand Up @@ -448,14 +453,15 @@ func SyncFromIngestionLayer(key interface{}, wg *sync.WaitGroup) error {
// NOTE: There's no error propagation from the graph layer back to the workerqueue. We will evaluate
// This condition in the future and visit as needed. But right now, there's no necessity for it.

keyStr, ok := key.(string)
keyStr, ok := key.(akogatewayapilib.KeyContext)
if !ok {
utils.AviLog.Warnf("Unexpected object type: expected string, got %T", key)
utils.AviLog.Warnf("Unexpected object type: expected KeyContext, got %T", key)
return nil
}
akogatewayapinodes.DequeueIngestion(keyStr, false)
return nil
}

func SyncFromFastRetryLayer(key interface{}, wg *sync.WaitGroup) error {
keyStr, ok := key.(string)
if !ok {
Expand Down
Loading

0 comments on commit 097d30d

Please sign in to comment.