Skip to content

Commit

Permalink
Revert "dns: stop polling for updates; use UpdateState API (#3165)"
Browse files Browse the repository at this point in the history
This reverts commit e5e980f.
  • Loading branch information
menghanl authored Nov 26, 2019
1 parent 7e7050b commit 8d64ed6
Show file tree
Hide file tree
Showing 2 changed files with 253 additions and 149 deletions.
139 changes: 101 additions & 38 deletions internal/resolver/dns/dns_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import (
"sync"
"time"

"google.golang.org/grpc/backoff"
"google.golang.org/grpc/grpclog"
internalbackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)

// EnableSRVLookups controls whether the DNS resolver attempts to fetch gRPCLB
Expand All @@ -48,6 +49,7 @@ func init() {

const (
defaultPort = "443"
defaultFreq = time.Minute * 30
defaultDNSSvrPort = "53"
golang = "GO"
// txtPrefix is the prefix string to be prepended to the host name for txt record lookup.
Expand Down Expand Up @@ -97,10 +99,13 @@ var customAuthorityResolver = func(authority string) (netResolver, error) {

// NewBuilder creates a dnsBuilder which is used to factory DNS resolvers.
func NewBuilder() resolver.Builder {
return &dnsBuilder{}
return &dnsBuilder{minFreq: defaultFreq}
}

type dnsBuilder struct{}
type dnsBuilder struct {
// minimum frequency of polling the DNS server.
minFreq time.Duration
}

// Build creates and starts a DNS resolver that watches the name resolution of the target.
func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
Expand All @@ -110,20 +115,33 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
}

// IP address.
if ipAddr, ok := formatIP(host); ok {
addr := []resolver.Address{{Addr: ipAddr + ":" + port}}
cc.UpdateState(resolver.State{Addresses: addr})
return deadResolver{}, nil
if net.ParseIP(host) != nil {
host, _ = formatIP(host)
addr := []resolver.Address{{Addr: host + ":" + port}}
i := &ipResolver{
cc: cc,
ip: addr,
rn: make(chan struct{}, 1),
q: make(chan struct{}),
}
cc.NewAddress(addr)
go i.watcher()
return i, nil
}

// DNS address (non-IP).
ctx, cancel := context.WithCancel(context.Background())
bc := backoff.DefaultConfig
bc.MaxDelay = b.minFreq
d := &dnsResolver{
freq: b.minFreq,
backoff: internalbackoff.Exponential{Config: bc},
host: host,
port: port,
ctx: ctx,
cancel: cancel,
cc: cc,
t: time.NewTimer(0),
rn: make(chan struct{}, 1),
disableServiceConfig: opts.DisableServiceConfig,
}
Expand All @@ -139,7 +157,6 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts

d.wg.Add(1)
go d.watcher()
d.ResolveNow(resolver.ResolveNowOptions{})
return d, nil
}

Expand All @@ -154,23 +171,53 @@ type netResolver interface {
LookupTXT(ctx context.Context, name string) (txts []string, err error)
}

// deadResolver is a resolver that does nothing.
type deadResolver struct{}
// ipResolver watches for the name resolution update for an IP address.
type ipResolver struct {
cc resolver.ClientConn
ip []resolver.Address
// rn channel is used by ResolveNow() to force an immediate resolution of the target.
rn chan struct{}
q chan struct{}
}

// ResolveNow resend the address it stores, no resolution is needed.
func (i *ipResolver) ResolveNow(opt resolver.ResolveNowOptions) {
select {
case i.rn <- struct{}{}:
default:
}
}

func (deadResolver) ResolveNow(resolver.ResolveNowOptions) {}
// Close closes the ipResolver.
func (i *ipResolver) Close() {
close(i.q)
}

func (deadResolver) Close() {}
func (i *ipResolver) watcher() {
for {
select {
case <-i.rn:
i.cc.NewAddress(i.ip)
case <-i.q:
return
}
}
}

// dnsResolver watches for the name resolution update for a non-IP target.
type dnsResolver struct {
host string
port string
resolver netResolver
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
freq time.Duration
backoff internalbackoff.Exponential
retryCount int
host string
port string
resolver netResolver
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
// rn channel is used by ResolveNow() to force an immediate resolution of the target.
rn chan struct{}
t *time.Timer
// wg is used to enforce Close() to return after the watcher() goroutine has finished.
// Otherwise, data race will be possible. [Race Example] in dns_resolver_test we
// replace the real lookup functions with mocked ones to facilitate testing.
Expand All @@ -182,7 +229,7 @@ type dnsResolver struct {
}

// ResolveNow invoke an immediate resolution of the target that this dnsResolver watches.
func (d *dnsResolver) ResolveNow(resolver.ResolveNowOptions) {
func (d *dnsResolver) ResolveNow(opt resolver.ResolveNowOptions) {
select {
case d.rn <- struct{}{}:
default:
Expand All @@ -193,6 +240,7 @@ func (d *dnsResolver) ResolveNow(resolver.ResolveNowOptions) {
func (d *dnsResolver) Close() {
d.cancel()
d.wg.Wait()
d.t.Stop()
}

func (d *dnsResolver) watcher() {
Expand All @@ -201,11 +249,29 @@ func (d *dnsResolver) watcher() {
select {
case <-d.ctx.Done():
return
case <-d.t.C:
case <-d.rn:
if !d.t.Stop() {
// Before resetting a timer, it should be stopped to prevent racing with
// reads on it's channel.
<-d.t.C
}
}

state := d.lookup()
d.cc.UpdateState(*state)
result, sc := d.lookup()
// Next lookup should happen within an interval defined by d.freq. It may be
// more often due to exponential retry on empty address list.
if len(result) == 0 {
d.retryCount++
d.t.Reset(d.backoff.Backoff(d.retryCount))
} else {
d.retryCount = 0
d.t.Reset(d.freq)
}
if sc != "" { // We get empty string when disabled or the TXT lookup failed.
d.cc.NewServiceConfig(sc)
}
d.cc.NewAddress(result)

// Sleep to prevent excessive re-resolutions. Incoming resolution requests
// will be queued in d.rn.
Expand Down Expand Up @@ -248,12 +314,11 @@ func (d *dnsResolver) lookupSRV() []resolver.Address {
return newAddrs
}

func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult {
func (d *dnsResolver) lookupTXT() string {
ss, err := d.resolver.LookupTXT(d.ctx, txtPrefix+d.host)
if err != nil {
err = fmt.Errorf("error from DNS TXT record lookup: %v", err)
grpclog.Infoln("grpc:", err)
return &serviceconfig.ParseResult{Err: err}
grpclog.Infof("grpc: failed dns TXT record lookup due to %v.\n", err)
return ""
}
var res string
for _, s := range ss {
Expand All @@ -262,12 +327,10 @@ func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult {

// TXT record must have "grpc_config=" attribute in order to be used as service config.
if !strings.HasPrefix(res, txtAttribute) {
grpclog.Warningf("grpc: DNS TXT record %v missing %v attribute", res, txtAttribute)
// This is not an error; it is the equivalent of not having a service config.
return nil
grpclog.Warningf("grpc: TXT record %v missing %v attribute", res, txtAttribute)
return ""
}
sc := canaryingSC(strings.TrimPrefix(res, txtAttribute))
return d.cc.ParseServiceConfig(sc)
return strings.TrimPrefix(res, txtAttribute)
}

func (d *dnsResolver) lookupHost() []resolver.Address {
Expand All @@ -289,15 +352,15 @@ func (d *dnsResolver) lookupHost() []resolver.Address {
return newAddrs
}

func (d *dnsResolver) lookup() *resolver.State {
srv := d.lookupSRV()
state := &resolver.State{
Addresses: append(d.lookupHost(), srv...),
}
if !d.disableServiceConfig {
state.ServiceConfig = d.lookupTXT()
func (d *dnsResolver) lookup() ([]resolver.Address, string) {
newAddrs := d.lookupSRV()
// Support fallback to non-balancer address.
newAddrs = append(newAddrs, d.lookupHost()...)
if d.disableServiceConfig {
return newAddrs, ""
}
return state
sc := d.lookupTXT()
return newAddrs, canaryingSC(sc)
}

// formatIP returns ok = false if addr is not a valid textual representation of an IP address.
Expand Down
Loading

0 comments on commit 8d64ed6

Please sign in to comment.