Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster: fix not use manage_host when connect to components again #2207

Merged
merged 2 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/dm/command/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func clearOutDatedEtcdInfo(clusterName string, metadata *spec.Metadata, opt oper
if err != nil {
return err
}
dmMasterClient := api.NewDMMasterClient(topo.GetMasterList(), 10*time.Second, tlsCfg)
dmMasterClient := api.NewDMMasterClient(topo.GetMasterListWithManageHost(), 10*time.Second, tlsCfg)
registeredMasters, registeredWorkers, err := dmMasterClient.GetRegisteredMembers()
if err != nil {
return err
Expand Down
12 changes: 8 additions & 4 deletions components/dm/spec/topology_dm.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ func (s *Specification) BaseTopo() *spec.BaseTopo {
return &spec.BaseTopo{
GlobalOptions: &s.GlobalOptions,
MonitoredOptions: s.GetMonitoredOptions(),
MasterList: s.GetMasterList(),
MasterList: s.GetMasterListWithManageHost(),
Monitors: s.Monitors,
Grafanas: s.Grafanas,
Alertmanagers: s.Alertmanagers,
Expand All @@ -701,12 +701,16 @@ func (s *Specification) MergeTopo(rhs spec.Topology) spec.Topology {
return s.Merge(other)
}

// GetMasterList returns a list of Master API hosts of the current cluster
func (s *Specification) GetMasterList() []string {
// GetMasterListWithManageHost returns a list of Master API hosts of the current cluster
func (s *Specification) GetMasterListWithManageHost() []string {
var masterList []string

for _, master := range s.Masters {
masterList = append(masterList, utils.JoinHostPort(master.Host, master.Port))
host := master.Host
if master.ManageHost != "" {
host = master.ManageHost
}
masterList = append(masterList, utils.JoinHostPort(host, master.Port))
}

return masterList
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/manager/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ func (m *Manager) checkRegionsInfo(clusterName string, topo *spec.Specification,
}
pdClient := api.NewPDClient(
context.WithValue(context.TODO(), logprinter.ContextKeyLogger, m.logger),
topo.GetPDList(),
topo.GetPDListWithManageHost(),
time.Second*time.Duration(gOpt.APITimeout),
tlsConfig,
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/manager/display.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (m *Manager) Display(dopt DisplayOption, opt operator.Options) error {
continue
}
if strings.HasPrefix(v.Status, "Up") || strings.HasPrefix(v.Status, "Healthy") {
instAddr := utils.JoinHostPort(v.Host, v.Port)
instAddr := utils.JoinHostPort(v.ManageHost, v.Port)
masterActive = append(masterActive, instAddr)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/operation/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ func DestroyClusterTombstone(
pdEndpoints = strings.Split(forcePDEndpoints, ",")
logger.Warnf("%s is set, using %s as PD endpoints", EnvNamePDEndpointOverwrite, pdEndpoints)
} else {
pdEndpoints = cluster.GetPDList()
pdEndpoints = cluster.GetPDListWithManageHost()
}

var pdClient = api.NewPDClient(ctx, pdEndpoints, 10*time.Second, tlsCfg)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/operation/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func Upgrade(
pdEndpoints = strings.Split(forcePDEndpoints, ",")
logger.Warnf("%s is set, using %s as PD endpoints", EnvNamePDEndpointOverwrite, pdEndpoints)
} else {
pdEndpoints = topo.(*spec.Specification).GetPDList()
pdEndpoints = topo.(*spec.Specification).GetPDListWithManageHost()
}
pdClient := api.NewPDClient(ctx, pdEndpoints, 10*time.Second, tlsCfg)
origLeaderScheduleLimit, origRegionScheduleLimit, err = increaseScheduleLimit(ctx, pdClient)
Expand Down
12 changes: 10 additions & 2 deletions pkg/cluster/spec/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ func (s *AlertmanagerSpec) GetMainPort() int {
return s.WebPort
}

// GetManageHost returns the manage host of the instance
func (s *AlertmanagerSpec) GetManageHost() string {
if s.ManageHost != "" {
return s.ManageHost
}
return s.Host
}

// IsImported returns if the node is imported from TiDB-Ansible
func (s *AlertmanagerSpec) IsImported() bool {
return s.Imported
Expand Down Expand Up @@ -117,10 +125,10 @@ func (c *AlertManagerComponent) Instances() []Instance {
s.DataDir,
},
StatusFn: func(_ context.Context, timeout time.Duration, _ *tls.Config, _ ...string) string {
return statusByHost(s.Host, s.WebPort, "/-/ready", timeout, nil)
return statusByHost(s.GetManageHost(), s.WebPort, "/-/ready", timeout, nil)
},
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.Host, s.WebPort, timeout, tlsCfg)
return UptimeByHost(s.GetManageHost(), s.WebPort, timeout, tlsCfg)
},
},
topo: c.Topology,
Expand Down
12 changes: 10 additions & 2 deletions pkg/cluster/spec/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ func (s *CDCSpec) GetMainPort() int {
return s.Port
}

// GetManageHost returns the manage host of the instance
func (s *CDCSpec) GetManageHost() string {
if s.ManageHost != "" {
return s.ManageHost
}
return s.Host
}

// IsImported returns if the node is imported from TiDB-Ansible
func (s *CDCSpec) IsImported() bool {
return s.Imported
Expand Down Expand Up @@ -116,10 +124,10 @@ func (c *CDCComponent) Instances() []Instance {
s.DeployDir,
},
StatusFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config, _ ...string) string {
return statusByHost(s.Host, s.Port, "/status", timeout, tlsCfg)
return statusByHost(s.GetManageHost(), s.Port, "/status", timeout, tlsCfg)
},
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.Host, s.Port, timeout, tlsCfg)
return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg)
},
}, c.Topology}
if s.DataDir != "" {
Expand Down
12 changes: 10 additions & 2 deletions pkg/cluster/spec/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (s *DashboardSpec) Status(ctx context.Context, timeout time.Duration, tlsCf
timeout = statusQueryTimeout
}

state := statusByHost(s.Host, s.Port, "/status", timeout, tlsCfg)
state := statusByHost(s.GetManageHost(), s.Port, "/status", timeout, tlsCfg)

return state
}
Expand All @@ -75,6 +75,14 @@ func (s *DashboardSpec) GetMainPort() int {
return s.Port
}

// GetManageHost returns the manage host of the instance
func (s *DashboardSpec) GetManageHost() string {
if s.ManageHost != "" {
return s.ManageHost
}
return s.Host
}

// IsImported returns if the node is imported from TiDB-Ansible
func (s *DashboardSpec) IsImported() bool {
// TiDB-Ansible do not support dashboard
Expand Down Expand Up @@ -121,7 +129,7 @@ func (c *DashboardComponent) Instances() []Instance {
},
StatusFn: s.Status,
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.Host, s.Port, timeout, tlsCfg)
return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg)
},
}, c.Topology})
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/cluster/spec/drainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (s *DrainerSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg
timeout = statusQueryTimeout
}

state := statusByHost(s.Host, s.Port, "/status", timeout, tlsCfg)
state := statusByHost(s.GetManageHost(), s.Port, "/status", timeout, tlsCfg)

if s.Offline {
binlogClient, err := api.NewBinlogClient(pdList, timeout, tlsCfg)
Expand Down Expand Up @@ -94,6 +94,14 @@ func (s *DrainerSpec) GetMainPort() int {
return s.Port
}

// GetManageHost returns the manage host of the instance
func (s *DrainerSpec) GetManageHost() string {
if s.ManageHost != "" {
return s.ManageHost
}
return s.Host
}

// IsImported returns if the node is imported from TiDB-Ansible
func (s *DrainerSpec) IsImported() bool {
return s.Imported
Expand Down Expand Up @@ -139,7 +147,7 @@ func (c *DrainerComponent) Instances() []Instance {
},
StatusFn: s.Status,
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.Host, s.Port, timeout, tlsCfg)
return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg)
},
}, c.Topology})
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/cluster/spec/grafana.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ func (s *GrafanaSpec) GetMainPort() int {
return s.Port
}

// GetManageHost returns the manage host of the instance
func (s *GrafanaSpec) GetManageHost() string {
if s.ManageHost != "" {
return s.ManageHost
}
return s.Host
}

// IsImported returns if the node is imported from TiDB-Ansible
func (s *GrafanaSpec) IsImported() bool {
return s.Imported
Expand Down Expand Up @@ -122,10 +130,10 @@ func (c *GrafanaComponent) Instances() []Instance {
s.DeployDir,
},
StatusFn: func(_ context.Context, timeout time.Duration, _ *tls.Config, _ ...string) string {
return statusByHost(s.Host, s.Port, "/login", timeout, nil)
return statusByHost(s.GetManageHost(), s.Port, "/login", timeout, nil)
},
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.Host, s.Port, timeout, tlsCfg)
return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg)
},
},
topo: c.Topology,
Expand Down
12 changes: 10 additions & 2 deletions pkg/cluster/spec/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ func (s *PrometheusSpec) GetMainPort() int {
return s.Port
}

// GetManageHost returns the manage host of the instance
func (s *PrometheusSpec) GetManageHost() string {
if s.ManageHost != "" {
return s.ManageHost
}
return s.Host
}

// IsImported returns if the node is imported from TiDB-Ansible
func (s *PrometheusSpec) IsImported() bool {
return s.Imported
Expand Down Expand Up @@ -139,10 +147,10 @@ func (c *MonitorComponent) Instances() []Instance {
s.DataDir,
},
StatusFn: func(_ context.Context, timeout time.Duration, _ *tls.Config, _ ...string) string {
return statusByHost(s.Host, s.Port, "/-/ready", timeout, nil)
return statusByHost(s.GetManageHost(), s.Port, "/-/ready", timeout, nil)
},
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.Host, s.Port, timeout, tlsCfg)
return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg)
},
}, c.Topology}
if s.NgPort > 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/spec/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (c *PDComponent) Instances() []Instance {
},
StatusFn: s.Status,
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.Host, s.ClientPort, timeout, tlsCfg)
return UptimeByHost(s.GetManageHost(), s.ClientPort, timeout, tlsCfg)
},
},
topo: c.Topology,
Expand Down
12 changes: 10 additions & 2 deletions pkg/cluster/spec/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *PumpSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg *tl
timeout = statusQueryTimeout
}

state := statusByHost(s.Host, s.Port, "/status", timeout, tlsCfg)
state := statusByHost(s.GetManageHost(), s.Port, "/status", timeout, tlsCfg)

if s.Offline {
binlogClient, err := api.NewBinlogClient(pdList, timeout, tlsCfg)
Expand Down Expand Up @@ -93,6 +93,14 @@ func (s *PumpSpec) GetMainPort() int {
return s.Port
}

// GetManageHost returns the manage host of the instance
func (s *PumpSpec) GetManageHost() string {
if s.ManageHost != "" {
return s.ManageHost
}
return s.Host
}

// IsImported returns if the node is imported from TiDB-Ansible
func (s *PumpSpec) IsImported() bool {
return s.Imported
Expand Down Expand Up @@ -138,7 +146,7 @@ func (c *PumpComponent) Instances() []Instance {
},
StatusFn: s.Status,
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.Host, s.Port, timeout, tlsCfg)
return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg)
},
}, c.Topology})
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/cluster/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (s *Specification) BaseTopo() *BaseTopo {
return &BaseTopo{
GlobalOptions: &s.GlobalOptions,
MonitoredOptions: s.GetMonitoredOptions(),
MasterList: s.GetPDList(),
MasterList: s.GetPDListWithManageHost(),
Monitors: s.Monitors,
Grafanas: s.Grafanas,
Alertmanagers: s.Alertmanagers,
Expand Down Expand Up @@ -475,14 +475,14 @@ func (s *Specification) GetDashboardAddress(ctx context.Context, tlsCfg *tls.Con
// GetEtcdClient loads EtcdClient of current cluster
func (s *Specification) GetEtcdClient(tlsCfg *tls.Config) (*clientv3.Client, error) {
return clientv3.New(clientv3.Config{
Endpoints: s.GetPDList(),
Endpoints: s.GetPDListWithManageHost(),
TLS: tlsCfg,
})
}

// GetEtcdProxyClient loads EtcdClient of current cluster with TCP proxy
func (s *Specification) GetEtcdProxyClient(tlsCfg *tls.Config, tcpProxy *proxy.TCPProxy) (*clientv3.Client, chan struct{}, error) {
closeC := tcpProxy.Run(s.GetPDList())
closeC := tcpProxy.Run(s.GetPDListWithManageHost())
cli, err := clientv3.New(clientv3.Config{
Endpoints: tcpProxy.GetEndpoints(),
TLS: tlsCfg,
Expand Down
12 changes: 10 additions & 2 deletions pkg/cluster/spec/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ func (s *TiDBSpec) GetMainPort() int {
return s.Port
}

// GetManageHost returns the manage host of the instance
func (s *TiDBSpec) GetManageHost() string {
if s.ManageHost != "" {
return s.ManageHost
}
return s.Host
}

// IsImported returns if the node is imported from TiDB-Ansible
func (s *TiDBSpec) IsImported() bool {
return s.Imported
Expand Down Expand Up @@ -115,10 +123,10 @@ func (c *TiDBComponent) Instances() []Instance {
s.DeployDir,
},
StatusFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config, _ ...string) string {
return statusByHost(s.Host, s.StatusPort, "/status", timeout, tlsCfg)
return statusByHost(s.GetManageHost(), s.StatusPort, "/status", timeout, tlsCfg)
},
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.Host, s.StatusPort, timeout, tlsCfg)
return UptimeByHost(s.GetManageHost(), s.StatusPort, timeout, tlsCfg)
},
}, c.Topology})
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/cluster/spec/tiflash.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ func (s *TiFlashSpec) GetMainPort() int {
return s.TCPPort
}

// GetManageHost returns the manage host of the instance
func (s *TiFlashSpec) GetManageHost() string {
if s.ManageHost != "" {
return s.ManageHost
}
return s.Host
}

// IsImported returns if the node is imported from TiDB-Ansible
func (s *TiFlashSpec) IsImported() bool {
return s.Imported
Expand Down Expand Up @@ -291,7 +299,7 @@ func (c *TiFlashComponent) Instances() []Instance {
},
StatusFn: s.Status,
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.Host, s.StatusPort, timeout, tlsCfg)
return UptimeByHost(s.GetManageHost(), s.StatusPort, timeout, tlsCfg)
},
}, c.Topology})
}
Expand Down Expand Up @@ -876,7 +884,7 @@ func (i *TiFlashInstance) Ready(ctx context.Context, e ctxt.Executor, timeout ui
if i.topo.BaseTopo().GlobalOptions.TLSEnabled {
scheme = "https"
}
addr := fmt.Sprintf("%s://%s/tiflash/store-status", scheme, utils.JoinHostPort(i.Host, i.GetStatusPort()))
addr := fmt.Sprintf("%s://%s/tiflash/store-status", scheme, utils.JoinHostPort(i.GetManageHost(), i.GetStatusPort()))
req, err := http.NewRequest("GET", addr, nil)
if err != nil {
return err
Expand Down
Loading