Skip to content

Commit c238b8d

Browse files
Jasonploxiln
Jason
authored andcommittedOct 17, 2021
consumer: re-use http client with keepalives for lookup requests
1 parent 3bce8d0 commit c238b8d

File tree

3 files changed

+26
-17
lines changed

3 files changed

+26
-17
lines changed
 

‎api_request.go

+1-16
Original file line numberDiff line numberDiff line change
@@ -24,29 +24,14 @@ func (c *deadlinedConn) Write(b []byte) (n int, err error) {
2424
return c.Conn.Write(b)
2525
}
2626

27-
func newDeadlineTransport(timeout time.Duration) *http.Transport {
28-
transport := &http.Transport{
29-
DisableKeepAlives: true,
30-
Dial: func(netw, addr string) (net.Conn, error) {
31-
c, err := net.DialTimeout(netw, addr, timeout)
32-
if err != nil {
33-
return nil, err
34-
}
35-
return &deadlinedConn{timeout, c}, nil
36-
},
37-
}
38-
return transport
39-
}
40-
4127
type wrappedResp struct {
4228
Status string `json:"status_txt"`
4329
StatusCode int `json:"status_code"`
4430
Data interface{} `json:"data"`
4531
}
4632

4733
// stores the result in the value pointed to by ret(must be a pointer)
48-
func apiRequestNegotiateV1(method string, endpoint string, headers http.Header, ret interface{}) error {
49-
httpclient := &http.Client{Transport: newDeadlineTransport(2 * time.Second)}
34+
func apiRequestNegotiateV1(httpclient *http.Client, method string, endpoint string, headers http.Header, ret interface{}) error {
5035
req, err := http.NewRequest(method, endpoint, nil)
5136
if err != nil {
5237
return err

‎config.go

+1
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ type Config struct {
110110
// reconnection attempts
111111
LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"`
112112
LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`
113+
LookupdPollTimeout time.Duration `opt:"lookupd_poll_timeout" default:"1m"`
113114

114115
// Maximum duration when REQueueing (for doubling of deferred requeue)
115116
MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`

‎consumer.go

+24-1
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ type Consumer struct {
128128
lookupdRecheckChan chan int
129129
lookupdHTTPAddrs []string
130130
lookupdQueryIndex int
131+
lookupdHttpClient *http.Client
131132

132133
wg sync.WaitGroup
133134
runningHandlers int32
@@ -326,6 +327,11 @@ func (r *Consumer) ChangeMaxInFlight(maxInFlight int) {
326327
}
327328
}
328329

330+
// set lookupd http client
331+
func (r *Consumer) SetLookupdHttpClient(httpclient *http.Client) {
332+
r.lookupdHttpClient = httpclient
333+
}
334+
329335
// ConnectToNSQLookupd adds an nsqlookupd address to the list for this Consumer instance.
330336
//
331337
// If it is the first to be added, it initiates an HTTP request to discover nsqd
@@ -355,6 +361,23 @@ func (r *Consumer) ConnectToNSQLookupd(addr string) error {
355361
}
356362
}
357363
r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, parsedAddr)
364+
if r.lookupdHttpClient == nil {
365+
transport := &http.Transport{
366+
DialContext: (&net.Dialer{
367+
Timeout: r.config.LookupdPollTimeout,
368+
KeepAlive: 30 * time.Second,
369+
}).DialContext,
370+
ResponseHeaderTimeout: r.config.LookupdPollTimeout,
371+
MaxIdleConns: 100,
372+
IdleConnTimeout: 90 * time.Second,
373+
TLSHandshakeTimeout: 10 * time.Second,
374+
}
375+
r.lookupdHttpClient = &http.Client{
376+
Transport: transport,
377+
Timeout: r.config.LookupdPollTimeout,
378+
}
379+
}
380+
358381
numLookupd := len(r.lookupdHTTPAddrs)
359382
r.mtx.Unlock()
360383

@@ -468,7 +491,7 @@ retry:
468491
if r.config.AuthSecret != "" && r.config.LookupdAuthorization {
469492
headers.Set("Authorization", fmt.Sprintf("Bearer %s", r.config.AuthSecret))
470493
}
471-
err := apiRequestNegotiateV1("GET", endpoint, headers, &data)
494+
err := apiRequestNegotiateV1(r.lookupdHttpClient, "GET", endpoint, headers, &data)
472495
if err != nil {
473496
r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", endpoint, err)
474497
retries++

0 commit comments

Comments
 (0)