Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/go/internal/modfetch: retry failed fetches #72019

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/cmd/go/internal/modfetch/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type proxySpec struct {
// fallBackOnError is true if a request should be attempted on the next proxy
// in the list after any error from this proxy. If fallBackOnError is false,
// the request will only be attempted on the next proxy if the error is
// equivalent to os.ErrNotFound, which is true for 404 and 410 responses.
// equivalent to fs.ErrNotExist, which is true for 404 and 410 responses.
fallBackOnError bool
}

Expand Down
163 changes: 161 additions & 2 deletions src/cmd/go/internal/modfetch/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ package modfetch

import (
"context"
"errors"
"fmt"
"io"
"io/fs"
mathrand "math/rand"
"os"
"strconv"
"time"
Expand Down Expand Up @@ -210,13 +212,13 @@ func Lookup(ctx context.Context, proxy, path string) Repo {
}

return lookupCache.Do(lookupCacheKey{proxy, path}, func() Repo {
return newCachingRepo(ctx, path, func(ctx context.Context) (Repo, error) {
return newRetryingRepo(6, newCachingRepo(ctx, path, func(ctx context.Context) (Repo, error) {
r, err := lookup(ctx, proxy, path)
if err == nil && traceRepo {
r = newLoggingRepo(r)
}
return r, err
})
}))
})
}

Expand Down Expand Up @@ -436,3 +438,160 @@ func (notExistError) Is(target error) bool {
func (e notExistError) Unwrap() error {
return e.err
}

// A retryingRepo is a wrapper around an underlying Repo
// that retries each operation if it fails with a temporary error.
type retryingRepo struct {
maxRetries int
r Repo
}

func (r *retryingRepo) ModulePath() string {
return r.r.ModulePath()
}

func (r *retryingRepo) CheckReuse(ctx context.Context, old *codehost.Origin) error {
_, err := wrapWithRetries(ctx, r.maxRetries, func() (any, error) {
return nil, r.r.CheckReuse(ctx, old)
})
return err
}

func (r *retryingRepo) Versions(ctx context.Context, prefix string) (*Versions, error) {
return wrapWithRetries(ctx, r.maxRetries, func() (*Versions, error) {
return r.r.Versions(ctx, prefix)
})
}

func (r *retryingRepo) Stat(ctx context.Context, rev string) (*RevInfo, error) {
return wrapWithRetries(ctx, r.maxRetries, func() (*RevInfo, error) {
return r.r.Stat(ctx, rev)
})
}

func (r *retryingRepo) Latest(ctx context.Context) (*RevInfo, error) {
return wrapWithRetries(ctx, r.maxRetries, func() (*RevInfo, error) {
return r.r.Latest(ctx)
})
}

func (r *retryingRepo) GoMod(ctx context.Context, version string) (data []byte, err error) {
return wrapWithRetries(ctx, r.maxRetries, func() ([]byte, error) {
return r.r.GoMod(ctx, version)
})
}

// permanentError is a wrapper around an error to indicate that it is not temporary.
type permanentError struct {
error
}

func (permanentError) Temporary() bool {
return false
}

func (r *retryingRepo) Zip(ctx context.Context, dst io.Writer, version string) error {
// This is a little trickier because we need to avoid partial writes to dst.
// In the case of a partial write,
// we attempt to truncate and rewind the file to the beginning.
// If we can't do that, we do not retry.
_, err := wrapWithRetries(ctx, r.maxRetries, func() (any, error) {
err := r.r.Zip(ctx, dst, version)
if err == nil { // if NO error
return nil, nil
}

// If there is an error downloading the file,
// there is little we can do without exposing implementation details of the caller.
// We can retry if we can rewind and truncate dst (true if dst is a temp file).
type truncateSeeker interface {
Truncate(size int64) error
io.Seeker
}

f, ok := dst.(truncateSeeker)
if !ok {
return nil, permanentError{err}
}

// Truncate the file back to empty.
if truncErr := f.Truncate(0); truncErr != nil {
return nil, permanentError{truncErr}
}
// Seek back to the beginning of the file.
if _, seekErr := f.Seek(0, io.SeekStart); seekErr != nil {
return nil, permanentError{seekErr}
}
return nil, err // maybe we can retry
})
return err
}

func newRetryingRepo(maxRetries int, r Repo) *retryingRepo {
return &retryingRepo{
maxRetries: maxRetries,
r: r,
}
}

func wrapWithRetries[T any](ctx context.Context, maxRetries int, f func() (T, error)) (v T, err error) {
for retry := 0; ; retry++ {
v, err = f()
if err == nil {
return
}

if retry >= maxRetries || !shouldRetry(err) {
return
}

// After the first retry,
// do exponential backoff with 10% jitter starting at 1s.
if retry == 0 {
continue
}
backoff := float64(uint(1) << (uint(retry) - 1))
backoff += backoff * (0.1 * mathrand.Float64())
tm := time.NewTimer(time.Second * time.Duration(backoff))
select {
case <-tm.C:
case <-ctx.Done():
}
tm.Stop()
}
}

func shouldRetry(err error) bool {
if err == nil {
return false
}

if errors.Is(err, fs.ErrNotExist) {
return false
}

var isTimeout interface{ Timeout() bool }
if errors.As(err, &isTimeout) && isTimeout.Timeout() {
return true
}

var httpError *web.HTTPError
if errors.As(err, &httpError) {
switch httpError.StatusCode {
case 0, 429: // No HTTP response, Too Many Requests
return true
case 404, 410, 501: // Not Found, Gone, Not Implemented
return false
}

if httpError.StatusCode >= 500 {
return true
}
}

var isTemporary interface{ Temporary() bool }
if errors.As(err, &isTemporary) {
return isTemporary.Temporary()
}
return false
}