Skip to content
This repository was archived by the owner on Dec 1, 2018. It is now read-only.

Commit ddd126e

Browse files
committed
Add support for label selector for nodes.
1 parent c61da91 commit ddd126e

File tree

5 files changed

+157
-45
lines changed

5 files changed

+157
-45
lines changed

integration/framework.go

+13-7
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ type kubeFramework interface {
6464
GetProxyUrlForService(service *api.Service) string
6565

6666
// Returns the node hostnames.
67-
GetNodes() ([]string, error)
67+
GetNodeNames() ([]string, error)
68+
69+
// Returns the nodes.
70+
GetNodes() (*api.NodeList, error)
6871

6972
// Returns pod names in the cluster.
7073
// TODO: Remove, or mix with namespace
@@ -410,22 +413,25 @@ func (self *realKubeFramework) GetProxyUrlForService(service *api.Service) strin
410413
return fmt.Sprintf("%s/api/v1/proxy/namespaces/default/services/%s/", self.masterIP, service.Name)
411414
}
412415

413-
func (self *realKubeFramework) GetNodes() ([]string, error) {
416+
func (self *realKubeFramework) GetNodeNames() ([]string, error) {
414417
var nodes []string
415-
nodeList, err := self.kubeClient.Nodes().List(api.ListOptions{
416-
LabelSelector: labels.Everything(),
417-
FieldSelector: fields.Everything(),
418-
})
418+
nodeList, err := self.GetNodes()
419419
if err != nil {
420420
return nodes, err
421421
}
422-
423422
for _, node := range nodeList.Items {
424423
nodes = append(nodes, node.Name)
425424
}
426425
return nodes, nil
427426
}
428427

428+
func (self *realKubeFramework) GetNodes() (*api.NodeList, error) {
429+
return self.kubeClient.Nodes().List(api.ListOptions{
430+
LabelSelector: labels.Everything(),
431+
FieldSelector: fields.Everything(),
432+
})
433+
}
434+
429435
func (self *realKubeFramework) GetAllRunningPods() ([]api.Pod, error) {
430436
return getRunningPods(true, self.kubeClient)
431437
}

integration/heapster_api_test.go

+90-22
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func removeHeapsterImage(fm kubeFramework, zone string) error {
111111
} else {
112112
glog.V(2).Infof("Heapster image removed.")
113113
}
114-
if nodes, err := fm.GetNodes(); err == nil {
114+
if nodes, err := fm.GetNodeNames(); err == nil {
115115
for _, node := range nodes {
116116
host := strings.Split(node, ".")[0]
117117
cleanupRemoteHost(host, zone)
@@ -168,7 +168,7 @@ func buildAndPushDockerImages(fm kubeFramework, zone string) error {
168168
if *avoidBuild {
169169
return nil
170170
}
171-
nodes, err := fm.GetNodes()
171+
nodes, err := fm.GetNodeNames()
172172
if err != nil {
173173
return err
174174
}
@@ -247,7 +247,7 @@ func runMetricExportTest(fm kubeFramework, svc *kube_api.Service) error {
247247
}
248248
glog.V(0).Infof("Expected pods: %v", expectedPods)
249249

250-
expectedNodes, err := fm.GetNodes()
250+
expectedNodes, err := fm.GetNodeNames()
251251
if err != nil {
252252
return err
253253
}
@@ -530,7 +530,7 @@ func runModelTest(fm kubeFramework, svc *kube_api.Service) error {
530530
if len(podList) == 0 {
531531
return fmt.Errorf("empty pod list")
532532
}
533-
nodeList, err := fm.GetNodes()
533+
nodeList, err := fm.GetNodeNames()
534534
if err != nil {
535535
return err
536536
}
@@ -624,6 +624,14 @@ func runModelTest(fm kubeFramework, svc *kube_api.Service) error {
624624
return nil
625625
}
626626

627+
const (
628+
apiPrefix = "apis"
629+
metricsApiGroupName = "metrics"
630+
metricsApiVersion = "v1alpha1"
631+
)
632+
633+
var baseMetricsUrl = fmt.Sprintf("%s/%s/%s", apiPrefix, metricsApiGroupName, metricsApiVersion)
634+
627635
func checkUsage(res kube_v1.ResourceList) error {
628636
if _, found := res[kube_v1.ResourceCPU]; !found {
629637
return fmt.Errorf("Cpu not found")
@@ -635,7 +643,7 @@ func checkUsage(res kube_v1.ResourceList) error {
635643
}
636644

637645
func getPodMetrics(fm kubeFramework, svc *kube_api.Service, pod kube_api.Pod) (*metrics_api.PodMetrics, error) {
638-
url := fmt.Sprintf("apis/metrics/v1alpha1/namespaces/%s/pods/%s", pod.Namespace, pod.Name)
646+
url := fmt.Sprintf("%s/namespaces/%s/pods/%s", baseMetricsUrl, pod.Namespace, pod.Name)
639647
body, err := getDataFromProxy(fm, svc, url)
640648
if err != nil {
641649
return nil, err
@@ -649,18 +657,18 @@ func getPodMetrics(fm kubeFramework, svc *kube_api.Service, pod kube_api.Pod) (*
649657
}
650658

651659
func getAllPodsInNamespaceMetrics(fm kubeFramework, svc *kube_api.Service, namespace string) (metrics_api.PodMetricsList, error) {
652-
url := fmt.Sprintf("apis/metrics/v1alpha1/namespaces/%s/pods/", namespace)
660+
url := fmt.Sprintf("%s/namespaces/%s/pods/", baseMetricsUrl, namespace)
653661
return getPodMetricsList(fm, svc, url, &labelSelectorEverything)
654662
}
655663

656664
func getAllPodsMetrics(fm kubeFramework, svc *kube_api.Service) (metrics_api.PodMetricsList, error) {
657-
url := "apis/metrics/v1alpha1/pods/"
665+
url := fmt.Sprintf("%s/pods/", baseMetricsUrl)
658666
selector := labels.Everything()
659667
return getPodMetricsList(fm, svc, url, &selector)
660668
}
661669

662-
func getLabelSelectedPodsMetrics(fm kubeFramework, svc *kube_api.Service, namespace string, labelSelector *labels.Selector) (metrics_api.PodMetricsList, error) {
663-
url := fmt.Sprintf("apis/metrics/v1alpha1/namespaces/%s/pods/", namespace)
670+
func getLabelSelectedPodMetrics(fm kubeFramework, svc *kube_api.Service, namespace string, labelSelector *labels.Selector) (metrics_api.PodMetricsList, error) {
671+
url := fmt.Sprintf("%s/namespaces/%s/pods/", baseMetricsUrl, namespace)
664672
return getPodMetricsList(fm, svc, url, labelSelector)
665673
}
666674

@@ -696,7 +704,7 @@ func checkSinglePodMetrics(metrics *metrics_api.PodMetrics, pod *kube_api.Pod) e
696704
}
697705

698706
func getSingleNodeMetrics(fm kubeFramework, svc *kube_api.Service, node string) (*metrics_api.NodeMetrics, error) {
699-
url := fmt.Sprintf("apis/metrics/v1alpha1/nodes/%s", node)
707+
url := fmt.Sprintf("%s/nodes/%s", baseMetricsUrl, node)
700708
body, err := getDataFromProxy(fm, svc, url)
701709
if err != nil {
702710
return nil, err
@@ -709,8 +717,8 @@ func getSingleNodeMetrics(fm kubeFramework, svc *kube_api.Service, node string)
709717
return &data, nil
710718
}
711719

712-
func getNodeMetricsList(fm kubeFramework, svc *kube_api.Service, url string) (metrics_api.NodeMetricsList, error) {
713-
body, err := getDataFromProxy(fm, svc, url)
720+
func getNodeMetricsList(fm kubeFramework, svc *kube_api.Service, url string, labelSelector *labels.Selector) (metrics_api.NodeMetricsList, error) {
721+
body, err := getDataFromProxyWithSelector(fm, svc, url, labelSelector)
714722
if err != nil {
715723
return metrics_api.NodeMetricsList{}, err
716724
}
@@ -722,13 +730,19 @@ func getNodeMetricsList(fm kubeFramework, svc *kube_api.Service, url string) (me
722730
return data, nil
723731
}
724732

733+
func getLabelSelectedNodeMetrics(fm kubeFramework, svc *kube_api.Service, labelSelector *labels.Selector) (metrics_api.NodeMetricsList, error) {
734+
url := fmt.Sprintf("%s/nodes", baseMetricsUrl)
735+
return getNodeMetricsList(fm, svc, url, labelSelector)
736+
}
737+
725738
func getAllNodeMetrics(fm kubeFramework, svc *kube_api.Service) (metrics_api.NodeMetricsList, error) {
726-
url := "apis/metrics/v1alpha1/nodes/"
727-
return getNodeMetricsList(fm, svc, url)
739+
url := fmt.Sprintf("%s/nodes", baseMetricsUrl)
740+
selector := labels.Everything()
741+
return getNodeMetricsList(fm, svc, url, &selector)
728742
}
729743

730744
func runSingleNodeMetricsApiTest(fm kubeFramework, svc *kube_api.Service) error {
731-
nodeList, err := fm.GetNodes()
745+
nodeList, err := fm.GetNodeNames()
732746
if err != nil {
733747
return err
734748
}
@@ -751,11 +765,55 @@ func runSingleNodeMetricsApiTest(fm kubeFramework, svc *kube_api.Service) error
751765
return nil
752766
}
753767

754-
func runAllNodesMetricsApiTest(fm kubeFramework, svc *kube_api.Service) error {
768+
func runLabelSelectorNodeMetricsApiTest(fm kubeFramework, svc *kube_api.Service) error {
755769
nodeList, err := fm.GetNodes()
756770
if err != nil {
757771
return err
758772
}
773+
if len(nodeList.Items) == 0 {
774+
return fmt.Errorf("empty node list")
775+
}
776+
labelMap := make(map[string]map[string]kube_api.Node)
777+
for _, n := range nodeList.Items {
778+
for label, value := range n.Labels {
779+
selector := label + "=" + value
780+
if _, found := labelMap[selector]; !found {
781+
labelMap[selector] = make(map[string]kube_api.Node)
782+
}
783+
labelMap[selector][n.Name] = n
784+
}
785+
}
786+
787+
for selector, nodesWithLabel := range labelMap {
788+
sel, err := labels.Parse(selector)
789+
if err != nil {
790+
return err
791+
}
792+
metrics, err := getLabelSelectedNodeMetrics(fm, svc, &sel)
793+
if err != nil {
794+
return err
795+
}
796+
if len(metrics.Items) != len(nodesWithLabel) {
797+
return fmt.Errorf("Wrong number of label selected node metrics: expected %v, got %v", len(nodesWithLabel), len(metrics.Items))
798+
}
799+
for _, nodeMetric := range metrics.Items {
800+
node := nodesWithLabel[nodeMetric.Name]
801+
if nodeMetric.Name != node.Name {
802+
return fmt.Errorf("Wrong node name: expected %v, got %v", node.Name, nodeMetric.Name)
803+
}
804+
if err := checkUsage(nodeMetric.Usage); err != nil {
805+
return err
806+
}
807+
}
808+
}
809+
return nil
810+
}
811+
812+
func runAllNodesMetricsApiTest(fm kubeFramework, svc *kube_api.Service) error {
813+
nodeList, err := fm.GetNodeNames()
814+
if err != nil {
815+
return err
816+
}
759817
if len(nodeList) == 0 {
760818
return fmt.Errorf("empty node list")
761819
}
@@ -868,7 +926,7 @@ func runAllPodsMetricsApiTest(fm kubeFramework, svc *kube_api.Service) error {
868926
return nil
869927
}
870928

871-
func runLabelSelectorMetricsApiTest(fm kubeFramework, svc *kube_api.Service) error {
929+
func runLabelSelectorPodMetricsApiTest(fm kubeFramework, svc *kube_api.Service) error {
872930
podList, err := fm.GetAllRunningPods()
873931
if err != nil {
874932
return err
@@ -897,7 +955,7 @@ func runLabelSelectorMetricsApiTest(fm kubeFramework, svc *kube_api.Service) err
897955
if err != nil {
898956
return err
899957
}
900-
metrics, err := getLabelSelectedPodsMetrics(fm, svc, ns, &sel)
958+
metrics, err := getLabelSelectedPodMetrics(fm, svc, ns, &sel)
901959
if err != nil {
902960
return err
903961
}
@@ -994,12 +1052,12 @@ func apiTest(kubeVersion string, zone string) error {
9941052
return err
9951053
},
9961054
func() error {
997-
glog.V(2).Infof("Metrics API test - label selector")
998-
err := runLabelSelectorMetricsApiTest(fm, svc)
1055+
glog.V(2).Infof("Metrics API test - label selector for pods")
1056+
err := runLabelSelectorPodMetricsApiTest(fm, svc)
9991057
if err == nil {
1000-
glog.V(2).Infof("Metrics API test - label selector: OK")
1058+
glog.V(2).Infof("Metrics API test - label selector for pods: OK")
10011059
} else {
1002-
glog.V(2).Infof("Metrics API test - label selector: error: %v", err)
1060+
glog.V(2).Infof("Metrics API test - label selector for pods: error: %v", err)
10031061
}
10041062
return err
10051063
},
@@ -1013,6 +1071,16 @@ func apiTest(kubeVersion string, zone string) error {
10131071
}
10141072
return err
10151073
},
1074+
func() error {
1075+
glog.V(2).Infof("Metrics API test - label selector for nodes")
1076+
err := runLabelSelectorNodeMetricsApiTest(fm, svc)
1077+
if err == nil {
1078+
glog.V(2).Infof("Metrics API test - label selector for nodes: OK")
1079+
} else {
1080+
glog.V(2).Infof("Metrics API test - label selector for nodes: error: %v", err)
1081+
}
1082+
return err
1083+
},
10161084
func() error {
10171085
glog.V(2).Infof("Metrics API test - all nodes")
10181086
err := runAllNodesMetricsApiTest(fm, svc)

metrics/apis/metrics/handlers.go

+30-4
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,14 @@ import (
4040
type Api struct {
4141
metricSink *metricsink.MetricSink
4242
podLister *cache.StoreToPodLister
43+
nodeLister *cache.StoreToNodeLister
4344
}
4445

45-
func NewApi(metricSink *metricsink.MetricSink, podLister *cache.StoreToPodLister) *Api {
46+
func NewApi(metricSink *metricsink.MetricSink, podLister *cache.StoreToPodLister, nodeLister *cache.StoreToNodeLister) *Api {
4647
return &Api{
4748
metricSink: metricSink,
4849
podLister: podLister,
50+
nodeLister: nodeLister,
4951
}
5052
}
5153

@@ -58,7 +60,8 @@ func (a *Api) Register(container *restful.Container) {
5860
ws.Route(ws.GET("/nodes/").
5961
To(a.nodeMetricsList).
6062
Doc("Get a list of metrics for all available nodes.").
61-
Operation("nodeMetricsList"))
63+
Operation("nodeMetricsList")).
64+
Param(ws.QueryParameter("labelSelector", "A selector to restrict the list of returned objects by their labels. Defaults to everything.").DataType("string"))
6265

6366
ws.Route(ws.GET("/nodes/{node-name}/").
6467
To(a.nodeMetrics).
@@ -89,9 +92,32 @@ func (a *Api) Register(container *restful.Container) {
8992
}
9093

9194
func (a *Api) nodeMetricsList(request *restful.Request, response *restful.Response) {
95+
selector := request.QueryParameter("labelSelector")
96+
97+
labelSelector, err := labels.Parse(selector)
98+
if err != nil {
99+
errMsg := fmt.Errorf("Error while parsing selector %v: %v", selector, err)
100+
glog.Error(errMsg)
101+
response.WriteError(http.StatusBadRequest, errMsg)
102+
return
103+
}
104+
105+
nodes, err := a.nodeLister.NodeCondition(func(node *kube_api.Node) bool {
106+
if labelSelector.Empty() {
107+
return true
108+
}
109+
return labelSelector.Matches(labels.Set(node.Labels))
110+
}).List()
111+
if err != nil {
112+
errMsg := fmt.Errorf("Error while listing nodes: %v", err)
113+
glog.Error(errMsg)
114+
response.WriteError(http.StatusInternalServerError, errMsg)
115+
return
116+
}
117+
92118
res := v1alpha1.NodeMetricsList{}
93-
for _, node := range a.metricSink.GetNodes() {
94-
if m := a.getNodeMetrics(node); m != nil {
119+
for _, node := range nodes {
120+
if m := a.getNodeMetrics(node.Name); m != nil {
95121
res.Items = append(res.Items, *m)
96122
}
97123
}

metrics/handlers.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131

3232
const pprofBasePath = "/debug/pprof/"
3333

34-
func setupHandlers(metricSink *metricsink.MetricSink, podLister *cache.StoreToPodLister, historicalSource core.HistoricalSource) http.Handler {
34+
func setupHandlers(metricSink *metricsink.MetricSink, podLister *cache.StoreToPodLister, nodeLister *cache.StoreToNodeLister, historicalSource core.HistoricalSource) http.Handler {
3535

3636
runningInKubernetes := true
3737

@@ -42,7 +42,7 @@ func setupHandlers(metricSink *metricsink.MetricSink, podLister *cache.StoreToPo
4242
a := v1.NewApi(runningInKubernetes, metricSink, historicalSource)
4343
a.Register(wsContainer)
4444
// Metrics API
45-
m := metricsApi.NewApi(metricSink, podLister)
45+
m := metricsApi.NewApi(metricSink, podLister, nodeLister)
4646
m.Register(wsContainer)
4747

4848
handlePprofEndpoint := func(req *restful.Request, resp *restful.Response) {

0 commit comments

Comments
 (0)