Skip to content

Commit

Permalink
feat: add backoff and jitter to retry (#643)
Browse files Browse the repository at this point in the history
* feat: add backoff and jitter to retry

* refactor: check before retry wait

* style: rollback old code style

* refactor: do not return error

* refactor: strictly exponential backoff

* refactor: backward compatibility

* refactor: remove useless return error

* refactor: remove useless code

* refactor: rollback code

* refactor: use retry error channel and redirects

* refactor: divide method
  • Loading branch information
proost authored Oct 13, 2024
1 parent 55e0831 commit 113c567
Show file tree
Hide file tree
Showing 11 changed files with 1,940 additions and 889 deletions.
88 changes: 71 additions & 17 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ type singleClient struct {
stop uint32
cmd Builder
retry bool
retryHandler retryHandler
DisableCache bool
}

func newSingleClient(opt *ClientOption, prev conn, connFn connFn) (*singleClient, error) {
func newSingleClient(opt *ClientOption, prev conn, connFn connFn, retryer retryHandler) (*singleClient, error) {
if len(opt.InitAddress) == 0 {
return nil, ErrNoAddr
}
Expand All @@ -31,22 +32,29 @@ func newSingleClient(opt *ClientOption, prev conn, connFn connFn) (*singleClient
if err := conn.Dial(); err != nil {
return nil, err
}
return newSingleClientWithConn(conn, cmds.NewBuilder(cmds.NoSlot), !opt.DisableRetry, opt.DisableCache), nil
return newSingleClientWithConn(conn, cmds.NewBuilder(cmds.NoSlot), !opt.DisableRetry, opt.DisableCache, retryer), nil
}

func newSingleClientWithConn(conn conn, builder Builder, retry, disableCache bool) *singleClient {
return &singleClient{cmd: builder, conn: conn, retry: retry, DisableCache: disableCache}
func newSingleClientWithConn(conn conn, builder Builder, retry, disableCache bool, retryer retryHandler) *singleClient {
return &singleClient{cmd: builder, conn: conn, retry: retry, retryHandler: retryer, DisableCache: disableCache}
}

func (c *singleClient) B() Builder {
return c.cmd
}

func (c *singleClient) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
attempts := 1
retry:
resp = c.conn.Do(ctx, cmd)
if c.retry && cmd.IsReadOnly() && c.isRetryable(resp.NonRedisError(), ctx) {
goto retry
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, resp.Error(),
)
if shouldRetry {
attempts++
goto retry
}
}
if resp.NonRedisError() == nil { // not recycle cmds if error, since cmds may be used later in pipe. consider recycle them by pipe
cmds.PutCompleted(cmd)
Expand Down Expand Up @@ -75,12 +83,19 @@ func (c *singleClient) DoMulti(ctx context.Context, multi ...Completed) (resps [
if len(multi) == 0 {
return nil
}
attempts := 1
retry:
resps = c.conn.DoMulti(ctx, multi...).s
if c.retry && allReadOnly(multi) {
for _, resp := range resps {
if c.isRetryable(resp.NonRedisError(), ctx) {
goto retry
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, resp.Error(),
)
if shouldRetry {
attempts++
goto retry
}
}
}
}
Expand All @@ -96,12 +111,19 @@ func (c *singleClient) DoMultiCache(ctx context.Context, multi ...CacheableTTL)
if len(multi) == 0 {
return nil
}
attempts := 1
retry:
resps = c.conn.DoMultiCache(ctx, multi...).s
if c.retry {
for _, resp := range resps {
if c.isRetryable(resp.NonRedisError(), ctx) {
goto retry
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, resp.Error(),
)
if shouldRetry {
attempts++
goto retry
}
}
}
}
Expand All @@ -114,10 +136,15 @@ retry:
}

func (c *singleClient) DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) (resp RedisResult) {
attempts := 1
retry:
resp = c.conn.DoCache(ctx, cmd, ttl)
if c.retry && c.isRetryable(resp.NonRedisError(), ctx) {
goto retry
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, resp.Error())
if shouldRetry {
attempts++
goto retry
}
}
if err := resp.NonRedisError(); err == nil || err == ErrDoCacheAborted {
cmds.PutCacheable(cmd)
Expand All @@ -126,11 +153,16 @@ retry:
}

func (c *singleClient) Receive(ctx context.Context, subscribe Completed, fn func(msg PubSubMessage)) (err error) {
attempts := 1
retry:
err = c.conn.Receive(ctx, subscribe, fn)
if c.retry {
if _, ok := err.(*RedisError); !ok && c.isRetryable(err, ctx) {
goto retry
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, err)
if shouldRetry {
attempts++
goto retry
}
}
}
if err == nil {
Expand All @@ -141,15 +173,15 @@ retry:

func (c *singleClient) Dedicated(fn func(DedicatedClient) error) (err error) {
wire := c.conn.Acquire()
dsc := &dedicatedSingleClient{cmd: c.cmd, conn: c.conn, wire: wire, retry: c.retry}
dsc := &dedicatedSingleClient{cmd: c.cmd, conn: c.conn, wire: wire, retry: c.retry, retryHandler: c.retryHandler}
err = fn(dsc)
dsc.release()
return err
}

func (c *singleClient) Dedicate() (DedicatedClient, func()) {
wire := c.conn.Acquire()
dsc := &dedicatedSingleClient{cmd: c.cmd, conn: c.conn, wire: wire, retry: c.retry}
dsc := &dedicatedSingleClient{cmd: c.cmd, conn: c.conn, wire: wire, retry: c.retry, retryHandler: c.retryHandler}
return dsc, dsc.release
}

Expand All @@ -168,21 +200,29 @@ type dedicatedSingleClient struct {
mark uint32
cmd Builder

retry bool
retry bool
retryHandler retryHandler
}

func (c *dedicatedSingleClient) B() Builder {
return c.cmd
}

func (c *dedicatedSingleClient) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
attempts := 1
retry:
if err := c.check(); err != nil {
return newErrResult(err)
}
resp = c.wire.Do(ctx, cmd)
if c.retry && cmd.IsReadOnly() && isRetryable(resp.NonRedisError(), c.wire, ctx) {
goto retry
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, resp.Error(),
)
if shouldRetry {
attempts++
goto retry
}
}
if resp.NonRedisError() == nil {
cmds.PutCompleted(cmd)
Expand All @@ -194,6 +234,7 @@ func (c *dedicatedSingleClient) DoMulti(ctx context.Context, multi ...Completed)
if len(multi) == 0 {
return nil
}
attempts := 1
retryable := c.retry
if retryable {
retryable = allReadOnly(multi)
Expand All @@ -203,10 +244,16 @@ retry:
return fillErrs(len(multi), err)
}
resp = c.wire.DoMulti(ctx, multi...).s
if retryable && anyRetryable(resp, c.wire, ctx) {
goto retry
}
for i, cmd := range multi {
if retryable && isRetryable(resp[i].NonRedisError(), c.wire, ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, resp[i].Error(),
)
if shouldRetry {
attempts++
goto retry
}
}
if resp[i].NonRedisError() == nil {
cmds.PutCompleted(cmd)
}
Expand All @@ -215,14 +262,21 @@ retry:
}

func (c *dedicatedSingleClient) Receive(ctx context.Context, subscribe Completed, fn func(msg PubSubMessage)) (err error) {
attempts := 1
retry:
if err := c.check(); err != nil {
return err
}
err = c.wire.Receive(ctx, subscribe, fn)
if c.retry {
if _, ok := err.(*RedisError); !ok && isRetryable(err, c.wire, ctx) {
goto retry
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, err,
)
if shouldRetry {
attempts++
goto retry
}
}
}
if err == nil {
Expand Down
Loading

0 comments on commit 113c567

Please sign in to comment.