Skip to content

Commit

Permalink
Merge branch 'master' into cloud
Browse files Browse the repository at this point in the history
  • Loading branch information
AstroProfundis authored Mar 15, 2023
2 parents db3e222 + c3c127f commit abbb58d
Show file tree
Hide file tree
Showing 24 changed files with 133 additions and 40 deletions.
2 changes: 1 addition & 1 deletion components/cluster/command/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func newImportCmd() *cobra.Command {
}

// copy SSH key to TiUP profile directory
if err = tiuputils.CreateDir(spec.ClusterPath(clsName, "ssh")); err != nil {
if err = os.MkdirAll(spec.ClusterPath(clsName, "ssh"), 0755); err != nil {
return err
}
srcKeyPathPriv := ansible.SSHKeyPath()
Expand Down
2 changes: 2 additions & 0 deletions components/cluster/command/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func newReloadCmd() *cobra.Command {
cmd.Flags().Uint64Var(&gOpt.APITimeout, "transfer-timeout", 600, "Timeout in seconds when transferring PD and TiKV store leaders, also for TiCDC drain one capture")
cmd.Flags().BoolVarP(&gOpt.IgnoreConfigCheck, "ignore-config-check", "", false, "Ignore the config check result")
cmd.Flags().BoolVar(&skipRestart, "skip-restart", false, "Only refresh configuration to remote and do not restart services")
cmd.Flags().StringVar(&gOpt.SSHCustomScripts.BeforeRestartInstance.Raw, "pre-restart-script", "", "(EXPERIMENTAL) Custom script to be executed on each server before the service is restarted, does not take effect when --skip-restart is set to true")
cmd.Flags().StringVar(&gOpt.SSHCustomScripts.AfterRestartInstance.Raw, "post-restart-script", "", "(EXPERIMENTAL) Custom script to be executed on each server after the service is restarted, does not take effect when --skip-restart is set to true")

return cmd
}
Expand Down
2 changes: 2 additions & 0 deletions components/cluster/command/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func newUpgradeCmd() *cobra.Command {
cmd.Flags().Uint64Var(&gOpt.APITimeout, "transfer-timeout", 600, "Timeout in seconds when transferring PD and TiKV store leaders, also for TiCDC drain one capture")
cmd.Flags().BoolVarP(&gOpt.IgnoreConfigCheck, "ignore-config-check", "", false, "Ignore the config check result")
cmd.Flags().BoolVarP(&offlineMode, "offline", "", false, "Upgrade a stopped cluster")
cmd.Flags().StringVar(&gOpt.SSHCustomScripts.BeforeRestartInstance.Raw, "pre-upgrade-script", "", "(EXPERIMENTAL) Custom script to be executed on each server before the server is upgraded")
cmd.Flags().StringVar(&gOpt.SSHCustomScripts.AfterRestartInstance.Raw, "post-upgrade-script", "", "(EXPERIMENTAL) Custom script to be executed on each server after the server is upgraded")

return cmd
}
7 changes: 5 additions & 2 deletions components/playground/instance/ticdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ type TiCDC struct {
var _ Instance = &TiCDC{}

// NewTiCDC create a TiCDC instance.
func NewTiCDC(binPath string, dir, host, configPath string, id int, pds []*PDInstance) *TiCDC {
func NewTiCDC(binPath string, dir, host, configPath string, id int, port int, pds []*PDInstance) *TiCDC {
if port <= 0 {
port = 8300
}
ticdc := &TiCDC{
instance: instance{
BinPath: binPath,
ID: id,
Dir: dir,
Host: host,
Port: utils.MustGetFreePort(host, 8300),
Port: utils.MustGetFreePort(host, port),
ConfigPath: configPath,
},
pds: pds,
Expand Down
8 changes: 5 additions & 3 deletions components/playground/instance/tiflash_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ runAsDaemon = true
`

const tiflashMarkCacheSize = `mark_cache_size = 5368709120`

const tiflashConfig = `
default_profile = "default"
display_name = "TiFlash"
http_port = %[2]d
listen_host = "0.0.0.0"
mark_cache_size = 5368709120
path = "%[5]s"
tcp_port = %[3]d
tmp_path = "%[6]s"
%[14]s
%[13]s
[flash]
service_addr = "%[10]s:%[8]d"
Expand Down Expand Up @@ -107,11 +109,11 @@ func writeTiFlashConfig(w io.Writer, version utils.Version, tcpPort, httpPort, s
if tidbver.TiFlashNotNeedSomeConfig(version.String()) {
conf = fmt.Sprintf(tiflashConfig, pdAddrs, httpPort, tcpPort,
deployDir, dataDir, tmpDir, logDir, servicePort, metricsPort,
ip, strings.Join(tidbStatusAddrs, ","), clusterManagerPath, "")
ip, strings.Join(tidbStatusAddrs, ","), clusterManagerPath, "", "")
} else {
conf = fmt.Sprintf(tiflashConfig, pdAddrs, httpPort, tcpPort,
deployDir, dataDir, tmpDir, logDir, servicePort, metricsPort,
ip, strings.Join(tidbStatusAddrs, ","), clusterManagerPath, tiflashDaemonConfig)
ip, strings.Join(tidbStatusAddrs, ","), clusterManagerPath, tiflashDaemonConfig, tiflashMarkCacheSize)
}
_, err := w.Write([]byte(conf))
return err
Expand Down
11 changes: 11 additions & 0 deletions components/playground/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ const (
dbPort = "db.port"
pdHost = "pd.host"
pdPort = "pd.port"
ticdcHost = "ticdc.host"
ticdcPort = "ticdc.port"

// config paths
dbConfig = "db.config"
Expand Down Expand Up @@ -338,6 +340,8 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol
rootCmd.Flags().Int(dbPort, defaultOptions.TiDB.Port, "Playground TiDB port. If not provided, TiDB will use 4000 as its port")
rootCmd.Flags().String(pdHost, defaultOptions.PD.Host, "Playground PD host. If not provided, PD will still use `host` flag as its host")
rootCmd.Flags().Int(pdPort, defaultOptions.PD.Port, "Playground PD port. If not provided, PD will use 2379 as its port")
rootCmd.Flags().String(ticdcHost, defaultOptions.TiCDC.Host, "Playground TiCDC host. If not provided, TiDB will still use `host` flag as its host")
rootCmd.Flags().Int(ticdcPort, defaultOptions.TiCDC.Port, "Playground TiCDC port. If not provided, TiCDC will use 8300 as its port")

rootCmd.Flags().String(dbConfig, defaultOptions.TiDB.ConfigPath, "TiDB instance configuration file")
rootCmd.Flags().String(kvConfig, defaultOptions.TiKV.ConfigPath, "TiKV instance configuration file")
Expand Down Expand Up @@ -500,6 +504,13 @@ func populateOpt(flagSet *pflag.FlagSet) (err error) {
if err != nil {
return
}
case ticdcHost:
options.TiCDC.Host = flag.Value.String()
case ticdcPort:
options.TiCDC.Port, err = strconv.Atoi(flag.Value.String())
if err != nil {
return
}
case pdHost:
options.PD.Host = flag.Value.String()
case pdPort:
Expand Down
2 changes: 1 addition & 1 deletion components/playground/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ func (p *Playground) addInstance(componentID string, cfg instance.Config) (ins i
ins = inst
p.tiflashs = append(p.tiflashs, inst)
case spec.ComponentCDC:
inst := instance.NewTiCDC(cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds)
inst := instance.NewTiCDC(cfg.BinPath, dir, host, cfg.ConfigPath, id, cfg.Port, p.pds)
ins = inst
p.ticdcs = append(p.ticdcs, inst)
case spec.ComponentTiKVCDC:
Expand Down
8 changes: 8 additions & 0 deletions embed/templates/config/prometheus.yml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,14 @@ scrape_configs:
{{- end}}
labels:
group: 'tiflash'
{{- end}}
{{- if .CDCAddrs}}
- targets:
{{- range .CDCAddrs}}
- '{{.}}'
{{- end}}
labels:
group: 'ticdc'
{{- end}}
relabel_configs:
- source_labels: [__address__]
Expand Down
1 change: 0 additions & 1 deletion pkg/cluster/ansible/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ default_profile = "default"
display_name = "TiFlash"
http_port = 11316
listen_host = "0.0.0.0"
mark_cache_size = 5368709120
path = "/data1/test-cluster/leiysky-ansible-test-deploy/tiflash/data/db"
tcp_port = 11315
tmp_path = "/data1/test-cluster/leiysky-ansible-test-deploy/tiflash/data/db/tmp"
Expand Down
4 changes: 2 additions & 2 deletions pkg/cluster/executor/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"context"
"fmt"
"os"
"os/exec"
"os/user"
"path/filepath"
Expand All @@ -26,7 +27,6 @@ import (
"github.com/fatih/color"
"github.com/pingcap/tiup/pkg/cluster/ctxt"
"github.com/pingcap/tiup/pkg/tui"
"github.com/pingcap/tiup/pkg/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -110,7 +110,7 @@ func (l *Local) Execute(ctx context.Context, cmd string, sudo bool, timeout ...t
// Transfer implements Executer interface.
func (l *Local) Transfer(ctx context.Context, src, dst string, download bool, limit int, _ bool) error {
targetPath := filepath.Dir(dst)
if err := utils.CreateDir(targetPath); err != nil {
if err := os.MkdirAll(targetPath, 0755); err != nil {
return err
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/cluster/executor/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ func (e *EasySSHExecutor) Transfer(ctx context.Context, src, dst string, downloa
defer client.Close()
defer session.Close()

err = os.MkdirAll(filepath.Dir(dst), 0755)
if err != nil {
return nil
}
return ScpDownload(session, client, src, dst, limit, compress)
}

Expand Down Expand Up @@ -379,7 +383,7 @@ func (e *NativeSSHExecutor) Transfer(ctx context.Context, src, dst string, downl

if download {
targetPath := filepath.Dir(dst)
if err := utils.CreateDir(targetPath); err != nil {
if err := os.MkdirAll(targetPath, 0755); err != nil {
return err
}
args = append(args, fmt.Sprintf("%s@%s:%s", e.Config.User, e.Config.Host, src), dst)
Expand Down
3 changes: 2 additions & 1 deletion pkg/cluster/manager/cacert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"crypto/x509"
"encoding/pem"
"fmt"
"os"
"path/filepath"

perrs "github.com/pingcap/errors"
Expand Down Expand Up @@ -108,7 +109,7 @@ func (m *Manager) genAndSaveCertificate(clusterName string, globalOptions *spec.
if globalOptions.TLSEnabled {
// generate CA
tlsPath := m.specManager.Path(clusterName, spec.TLSCertKeyDir)
if err := utils.CreateDir(tlsPath); err != nil {
if err := os.MkdirAll(tlsPath, 0755); err != nil {
return nil, err
}
ca, err := genAndSaveClusterCA(clusterName, tlsPath)
Expand Down
31 changes: 25 additions & 6 deletions pkg/cluster/manager/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/fatih/color"
"github.com/joomcode/errorx"
Expand Down Expand Up @@ -194,11 +196,33 @@ Do you want to continue? [y/N]:`,
return err
}
}

ctx := ctxt.New(
context.Background(),
opt.Concurrency,
m.logger,
)
tlsCfg, err := topo.TLSConfig(m.specManager.Path(name, spec.TLSCertKeyDir))
if err != nil {
return err
}

// make sure the cluster is stopped
if offline && !opt.Force {
running := false
topo.IterInstance(func(ins spec.Instance) {
if !running {
status := ins.Status(ctx, time.Duration(opt.APITimeout), tlsCfg, topo.BaseTopo().MasterList...)
if strings.HasPrefix(status, "Up") || strings.HasPrefix(status, "Healthy") {
running = true
}
}
}, opt.Concurrency)

if running {
return perrs.Errorf("cluster is running and cannot be upgraded offline")
}
}

b, err := m.sshTaskBuilder(name, topo, base.User, opt)
if err != nil {
return err
Expand All @@ -214,11 +238,6 @@ Do you want to continue? [y/N]:`,
}).
Build()

ctx := ctxt.New(
context.Background(),
opt.Concurrency,
m.logger,
)
if err := t.Execute(ctx); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
Expand Down
18 changes: 18 additions & 0 deletions pkg/cluster/operation/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,3 +653,21 @@ func monitorPortMap(options *spec.MonitoredOptions) map[string]int {
spec.ComponentBlackboxExporter: options.BlackboxExporterPort,
}
}

func executeSSHCommand(ctx context.Context, action, host, command string) error {
if command == "" {
return nil
}
e, found := ctxt.GetInner(ctx).GetExecutor(host)
if !found {
return fmt.Errorf("no executor")
}
logger := ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger)
logger.Infof("\t%s on %s", action, host)
stdout, stderr, err := e.Execute(ctx, command, false)
if err != nil {
return errors.Annotatef(err, "stderr: %s", string(stderr))
}
logger.Infof("\t%s", stdout)
return nil
}
22 changes: 22 additions & 0 deletions pkg/cluster/operation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package operator

import (
"fmt"
"os"

"github.com/pingcap/tiup/pkg/cluster/executor"
"github.com/pingcap/tiup/pkg/cluster/spec"
Expand Down Expand Up @@ -45,6 +46,7 @@ type Options struct {
SSHProxyIdentity string // the ssh proxy identity file
SSHProxyUsePassword bool // use password instead of identity file for ssh proxy connection
SSHProxyTimeout uint64 // timeout in seconds when connecting the proxy host
SSHCustomScripts SSHCustomScripts // custom scripts to be executed during the operation

// What type of things should we cleanup in clean command
CleanupData bool // should we cleanup data
Expand All @@ -59,6 +61,26 @@ type Options struct {
Operation Operation
}

// SSHCustomScripts represents the custom ssh script set to be executed during cluster operations
type SSHCustomScripts struct {
BeforeRestartInstance SSHCustomScript
AfterRestartInstance SSHCustomScript
}

// SSHCustomScript represents a custom ssh script to be executed during cluster operations
type SSHCustomScript struct {
Raw string
}

// Command returns the ssh command in string format
func (s SSHCustomScript) Command() string {
b, err := os.ReadFile(s.Raw)
if err != nil {
return s.Raw
}
return string(b)
}

// Operation represents the type of cluster operation
type Operation byte

Expand Down
10 changes: 10 additions & 0 deletions pkg/cluster/operation/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ func upgradeInstance(
rollingInstance, isRollingInstance = instance.(spec.RollingUpdateInstance)
}

err = executeSSHCommand(ctx, "Executing pre-upgrade command", instance.GetHost(), options.SSHCustomScripts.BeforeRestartInstance.Command())
if err != nil {
return err
}

if isRollingInstance {
err := rollingInstance.PreRestart(ctx, topo, int(options.APITimeout), tlsCfg)
if err != nil && !options.Force {
Expand All @@ -229,6 +234,11 @@ func upgradeInstance(
}
}

err = executeSSHCommand(ctx, "Executing post-upgrade command", instance.GetHost(), options.SSHCustomScripts.AfterRestartInstance.Command())
if err != nil {
return err
}

return nil
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/cluster/spec/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"path"
"path/filepath"

utils2 "github.com/pingcap/tiup/pkg/utils"

"github.com/pingcap/errors"
tiuplocaldata "github.com/pingcap/tiup/pkg/localdata"
)
Expand Down Expand Up @@ -82,7 +80,7 @@ func Initialize(base string) error {
})
initialized = true
// make sure the dir exist
return utils2.CreateDir(profileDir)
return os.MkdirAll(profileDir, 0755)
}

// ProfileDir returns the full profile directory path of TiUP.
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/spec/spec_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (s *SpecManager) GetAllClusters() (map[string]Metadata, error) {

// ensureDir ensures that the cluster directory exists.
func (s *SpecManager) ensureDir(clusterName string) error {
if err := utils.CreateDir(s.Path(clusterName)); err != nil {
if err := os.MkdirAll(s.Path(clusterName), 0755); err != nil {
return ErrCreateDirFailed.
Wrap(err, "Failed to create cluster metadata directory '%s'", s.Path(clusterName)).
WithProperty(tui.SuggestionFromString("Please check file system permissions and try again."))
Expand Down
Loading

0 comments on commit abbb58d

Please sign in to comment.