Skip to content

Commit

Permalink
cdc: before drain the capture, check the capture can be found (#2059)
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand authored Nov 3, 2022
1 parent fb0be5e commit cec291d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
28 changes: 27 additions & 1 deletion pkg/cluster/api/cdcapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,12 @@ func drainCapture(client *CDCOpenAPIClient, target string) (int, error) {
}

// DrainCapture request cdc owner move all tables on the target capture to other captures.
func (c *CDCOpenAPIClient) DrainCapture(target string, apiTimeoutSeconds int) error {
func (c *CDCOpenAPIClient) DrainCapture(addr, target string, apiTimeoutSeconds int) error {
if _, err := c.getCaptureByID(target); err != nil {
c.l().Debugf("cdc drain capture failed, cannot find the capture, address: %s, target: %s, err: %+v", addr, target, err)
return err
}
c.l().Infof("\t Start drain the capture, address: %s, captureID: %s", addr, target)
start := time.Now()
err := utils.Retry(func() error {
count, err := drainCapture(c, target)
Expand Down Expand Up @@ -196,6 +201,27 @@ func (c *CDCOpenAPIClient) GetOwner() (result *Capture, err error) {
return result, err
}

func (c *CDCOpenAPIClient) getCaptureByID(id string) (*Capture, error) {
var result *Capture
err := utils.Retry(func() error {
captures, err := c.GetAllCaptures()
if err != nil {
return err
}
for _, capture := range captures {
if capture.ID == id {
result = capture
return nil
}
}
return fmt.Errorf("target capture not found")
}, utils.RetryOption{
Delay: time.Second,
Timeout: 10 * time.Second,
})
return result, err
}

// GetCaptureByAddr return the capture information by the address
func (c *CDCOpenAPIClient) GetCaptureByAddr(addr string) (result *Capture, err error) {
captures, err := c.GetAllCaptures()
Expand Down
3 changes: 2 additions & 1 deletion pkg/cluster/spec/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,10 @@ func (i *CDCInstance) PreRestart(ctx context.Context, topo Topology, apiTimeoutS
}
}

if err := client.DrainCapture(captureID, apiTimeoutSeconds); err != nil {
if err := client.DrainCapture(address, captureID, apiTimeoutSeconds); err != nil {
logger.Debugf("cdc pre-restart finished, drain the capture failed, "+
"addr: %s, captureID: %s, err: %+v, elapsed: %+v", address, captureID, err, time.Since(start))
// if we drain any one capture failed, no need to drain other captures, just trigger hard restart
return nil
}

Expand Down

0 comments on commit cec291d

Please sign in to comment.