@@ -4,9 +4,10 @@ import (
4
4
"bytes"
5
5
"context"
6
6
"crypto/tls"
7
- "encoding/json"
8
7
"errors"
9
8
"fmt"
9
+ "github.com/goccy/go-json"
10
+ "github.com/valyala/fasthttp"
10
11
"io"
11
12
"math"
12
13
"math/rand"
@@ -111,10 +112,19 @@ var (
111
112
HTTPErrorCode : 500 ,
112
113
}
113
114
115
+ ErrBackendResponseDecompressionError = & RPCErr {
116
+ Code : JSONRPCErrorInternal - 22 ,
117
+ Message : "backend response too large" ,
118
+ HTTPErrorCode : 500 ,
119
+ }
120
+
114
121
ErrBackendUnexpectedJSONRPC = errors .New ("backend returned an unexpected JSON-RPC response" )
115
122
116
123
ErrConsensusGetReceiptsCantBeBatched = errors .New ("consensus_getReceipts cannot be batched" )
117
124
ErrConsensusGetReceiptsInvalidTarget = errors .New ("unsupported consensus_receipts_target" )
125
+
126
+ // For the new fastHTTP client
127
+ headerContentTypeJson = []byte ("application/json" )
118
128
)
119
129
120
130
func ErrInvalidRequest (msg string ) * RPCErr {
@@ -141,6 +151,7 @@ type Backend struct {
141
151
authUsername string
142
152
authPassword string
143
153
headers map [string ]string
154
+ fastClient * fasthttp.Client
144
155
client * LimitedHTTPClient
145
156
dialer * websocket.Dialer
146
157
maxRetries int
@@ -318,12 +329,29 @@ func NewBackend(
318
329
rpcSemaphore * semaphore.Weighted ,
319
330
opts ... BackendOpt ,
320
331
) * Backend {
332
+ readTimeout , _ := time .ParseDuration ("10000ms" )
333
+ writeTimeout , _ := time .ParseDuration ("10000ms" )
334
+ maxIdleConnDuration , _ := time .ParseDuration ("1h" )
321
335
backend := & Backend {
322
336
Name : name ,
323
337
rpcURL : rpcURL ,
324
338
wsURL : wsURL ,
325
339
maxResponseSize : math .MaxInt64 ,
326
- client : & LimitedHTTPClient {
340
+ fastClient : & fasthttp.Client {
341
+ MaxConnsPerHost : 16384 ,
342
+ ReadTimeout : readTimeout ,
343
+ WriteTimeout : writeTimeout ,
344
+ MaxIdleConnDuration : maxIdleConnDuration ,
345
+ NoDefaultUserAgentHeader : true , // Don't send: User-Agent: fasthttp
346
+ DisableHeaderNamesNormalizing : true , // If you set the case on your headers correctly you can enable this
347
+ DisablePathNormalizing : true ,
348
+ // increase DNS cache time to five minutes
349
+ Dial : (& fasthttp.TCPDialer {
350
+ Concurrency : 32768 ,
351
+ DNSCacheDuration : time .Minute * 5 ,
352
+ }).Dial ,
353
+ },
354
+ client : & LimitedHTTPClient { // Keep legacy client for now, TODO remove
327
355
Client : http.Client {Timeout : 5 * time .Second },
328
356
sem : rpcSemaphore ,
329
357
backendName : name ,
@@ -550,21 +578,10 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
550
578
body = mustMarshalJSON (rpcReqs )
551
579
}
552
580
553
- httpReq , err := http .NewRequestWithContext (ctx , "POST" , b .rpcURL , bytes .NewReader (body ))
554
- if err != nil {
555
- b .intermittentErrorsSlidingWindow .Incr ()
556
- RecordBackendNetworkErrorRateSlidingWindow (b , b .ErrorRate ())
557
- return nil , wrapErr (err , "error creating backend request" )
558
- }
559
-
560
- if b .authPassword != "" {
561
- httpReq .SetBasicAuth (b .authUsername , b .authPassword )
562
- }
563
-
564
- opTxProxyAuth := GetOpTxProxyAuthHeader (ctx )
565
- if opTxProxyAuth != "" {
566
- httpReq .Header .Set (DefaultOpTxProxyAuthHeader , opTxProxyAuth )
567
- }
581
+ req := fasthttp .AcquireRequest ()
582
+ req .SetRequestURI (b .rpcURL )
583
+ req .Header .SetMethod (fasthttp .MethodPost )
584
+ req .SetBody (body )
568
585
569
586
xForwardedFor := GetXForwardedFor (ctx )
570
587
if b .stripTrailingXFF {
@@ -573,42 +590,63 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
573
590
xForwardedFor = fmt .Sprintf ("%s, %s" , xForwardedFor , b .proxydIP )
574
591
}
575
592
576
- httpReq .Header .Set ("content-type" , "application/json" )
577
- httpReq .Header .Set ("X-Forwarded-For" , xForwardedFor )
593
+ req .Header .SetContentTypeBytes (headerContentTypeJson )
594
+ req .Header .Set ("X-Forwarded-For" , xForwardedFor )
595
+
596
+ // Geth native backend supports gzip
597
+ req .Header .Set ("Accept-Encoding" , "gzip" )
578
598
579
599
for name , value := range b .headers {
580
- httpReq .Header .Set (name , value )
600
+ req .Header .Set (name , value )
581
601
}
582
602
583
603
start := time .Now ()
584
- httpRes , err := b .client .DoLimited (httpReq )
604
+
605
+ httpRes := fasthttp .AcquireResponse ()
606
+
607
+ reqTimeout := time .Duration (5000 ) * time .Millisecond
608
+
609
+ err := b .fastClient .DoTimeout (req , httpRes , reqTimeout )
585
610
if err != nil {
586
611
b .intermittentErrorsSlidingWindow .Incr ()
587
612
RecordBackendNetworkErrorRateSlidingWindow (b , b .ErrorRate ())
588
- return nil , wrapErr (err , "error in backend request" )
613
+ return nil , wrapErr (err , "err in backend request" )
589
614
}
615
+ fasthttp .ReleaseRequest (req )
616
+ defer fasthttp .ReleaseResponse (httpRes )
590
617
591
618
metricLabelMethod := rpcReqs [0 ].Method
592
619
if isBatch {
593
620
metricLabelMethod = "<batch>"
594
621
}
622
+
623
+ sc := httpRes .StatusCode ()
624
+
595
625
rpcBackendHTTPResponseCodesTotal .WithLabelValues (
596
626
GetAuthCtx (ctx ),
597
627
b .Name ,
598
628
metricLabelMethod ,
599
- strconv .Itoa (httpRes . StatusCode ),
629
+ strconv .Itoa (sc ),
600
630
strconv .FormatBool (isBatch ),
601
631
).Inc ()
602
632
603
633
// Alchemy returns a 400 on bad JSONs, so handle that case
604
- if httpRes . StatusCode != 200 && httpRes . StatusCode != 400 {
634
+ if sc != 200 && sc != 400 {
605
635
b .intermittentErrorsSlidingWindow .Incr ()
606
636
RecordBackendNetworkErrorRateSlidingWindow (b , b .ErrorRate ())
607
637
return nil , fmt .Errorf ("response code %d" , httpRes .StatusCode )
608
638
}
609
639
610
- defer httpRes .Body .Close ()
611
- resB , err := io .ReadAll (LimitReader (httpRes .Body , b .maxResponseSize ))
640
+ // defer httpRes.Body.Close()
641
+
642
+ // This should intelligently choose decompression based on whether the downstream
643
+ // backend supported our request or not.
644
+ bodyUncomp , err := httpRes .BodyUncompressed ()
645
+ if err != nil {
646
+ return nil , ErrBackendResponseDecompressionError
647
+ }
648
+
649
+ resB , err := io .ReadAll (LimitReader (bytes .NewReader (bodyUncomp ), b .maxResponseSize ))
612
650
if errors .Is (err , ErrLimitReaderOverLimit ) {
613
651
return nil , ErrBackendResponseTooLarge
614
652
}
@@ -649,9 +687,9 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
649
687
650
688
// capture the HTTP status code in the response. this will only
651
689
// ever be 400 given the status check on line 318 above.
652
- if httpRes . StatusCode != 200 {
690
+ if sc != 200 {
653
691
for _ , res := range rpcRes {
654
- res .Error .HTTPErrorCode = httpRes . StatusCode
692
+ res .Error .HTTPErrorCode = sc
655
693
}
656
694
}
657
695
duration := time .Since (start )
@@ -1423,6 +1461,14 @@ func (bg *BackendGroup) ForwardRequestToBackendGroup(
1423
1461
error : err ,
1424
1462
}
1425
1463
}
1464
+
1465
+ if errors .Is (err , ErrBackendResponseDecompressionError ) {
1466
+ return & BackendGroupRPCResponse {
1467
+ RPCRes : nil ,
1468
+ ServedBy : "" ,
1469
+ error : err ,
1470
+ }
1471
+ }
1426
1472
if errors .Is (err , ErrBackendOffline ) {
1427
1473
log .Warn (
1428
1474
"skipping offline backend" ,
0 commit comments