Skip to content

Commit 38103d0

Browse files
committed
DS client - add TLS support
Add zkevm.l2-datastreamer-use-tls flag If set to true, a TLS connection will be used. This also enables SNI (Server Name Indication) support and allows connecting to ds server behind virtual hosting/load balancer
1 parent 521eb5f commit 38103d0

File tree

10 files changed

+40
-15
lines changed

10 files changed

+40
-15
lines changed

cmd/utils/flags.go

+5
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,11 @@ var (
415415
Usage: "L2 datastreamer endpoint",
416416
Value: "",
417417
}
418+
L2DataStreamerUseTLSFlag = cli.BoolFlag{
419+
Name: "zkevm.l2-datastreamer-use-tls",
420+
Usage: "Use TLS connection to L2 datastreamer endpoint",
421+
Value: false,
422+
}
418423
L2DataStreamerTimeout = cli.StringFlag{
419424
Name: "zkevm.l2-datastreamer-timeout",
420425
Usage: "The time to wait for data to arrive from the stream before reporting an error (0s doesn't check)",

eth/backend.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1302,7 +1302,7 @@ func newEtherMan(cfg *ethconfig.Config, l2ChainName, url string) *etherman.Clien
13021302

13031303
// creates a datastream client with default parameters
13041304
func initDataStreamClient(ctx context.Context, cfg *ethconfig.Zk, latestForkId uint16) *client.StreamClient {
1305-
return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestForkId)
1305+
return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.L2DataStreamerUseTLS, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestForkId)
13061306
}
13071307

13081308
func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config, chainConfig *chain.Config) error {

eth/ethconfig/config_zkevm.go

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ type Zk struct {
1212
L2ChainId uint64
1313
L2RpcUrl string
1414
L2DataStreamerUrl string
15+
L2DataStreamerUseTLS bool
1516
L2DataStreamerTimeout time.Duration
1617
L2ShortCircuitToVerifiedBatch bool
1718
L1SyncStartBlock uint64

turbo/cli/default_flags.go

+1
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ var DefaultFlags = []cli.Flag{
176176
&utils.L2ChainIdFlag,
177177
&utils.L2RpcUrlFlag,
178178
&utils.L2DataStreamerUrlFlag,
179+
&utils.L2DataStreamerUseTLSFlag,
179180
&utils.L2DataStreamerTimeout,
180181
&utils.L2ShortCircuitToVerifiedBatchFlag,
181182
&utils.L1SyncStartBlock,

turbo/cli/flags_zkevm.go

+1
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) {
162162
L2ChainId: ctx.Uint64(utils.L2ChainIdFlag.Name),
163163
L2RpcUrl: ctx.String(utils.L2RpcUrlFlag.Name),
164164
L2DataStreamerUrl: ctx.String(utils.L2DataStreamerUrlFlag.Name),
165+
L2DataStreamerUseTLS: ctx.Bool(utils.L2DataStreamerUseTLSFlag.Name),
165166
L2DataStreamerTimeout: l2DataStreamTimeout,
166167
L2ShortCircuitToVerifiedBatch: l2ShortCircuitToVerifiedBatchVal,
167168
L1SyncStartBlock: ctx.Uint64(utils.L1SyncStartBlock.Name),

zk/datastream/client/stream_client.go

+21-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package client
22

33
import (
44
"context"
5+
"crypto/tls"
56
"encoding/binary"
67
"errors"
78
"fmt"
@@ -10,10 +11,11 @@ import (
1011
"sync/atomic"
1112
"time"
1213

14+
"sync"
15+
1316
"github.com/ledgerwatch/erigon/zk/datastream/proto/github.com/0xPolygonHermez/zkevm-node/state/datastream"
1417
"github.com/ledgerwatch/erigon/zk/datastream/types"
1518
"github.com/ledgerwatch/log/v3"
16-
"sync"
1719
)
1820

1921
type StreamType uint64
@@ -68,6 +70,9 @@ type StreamClient struct {
6870

6971
lastError error
7072
started bool
73+
74+
useTLS bool
75+
tlsConfig *tls.Config
7176
}
7277

7378
const (
@@ -84,7 +89,7 @@ const (
8489

8590
// Creates a new client fo datastream
8691
// server must be in format "url:port"
87-
func NewClient(ctx context.Context, server string, version int, checkTimeout time.Duration, latestDownloadedForkId uint16) *StreamClient {
92+
func NewClient(ctx context.Context, server string, useTLS bool, version int, checkTimeout time.Duration, latestDownloadedForkId uint16) *StreamClient {
8893
c := &StreamClient{
8994
ctx: ctx,
9095
checkTimeout: checkTimeout,
@@ -94,8 +99,17 @@ func NewClient(ctx context.Context, server string, version int, checkTimeout tim
9499
entryChan: make(chan interface{}, 100000),
95100
currentFork: uint64(latestDownloadedForkId),
96101
mtxStreaming: &sync.Mutex{},
102+
useTLS: useTLS,
103+
tlsConfig: &tls.Config{},
97104
}
98105

106+
// Extract hostname from server address (removing port if present)
107+
host, _, err := net.SplitHostPort(c.server)
108+
if err != nil {
109+
host = c.server // If no port was specified, use the full server string
110+
}
111+
c.tlsConfig.ServerName = host
112+
99113
return c
100114
}
101115

@@ -282,9 +296,12 @@ func (c *StreamClient) GetProgressAtomic() *atomic.Uint64 {
282296

283297
// Opens a TCP connection to the server
284298
func (c *StreamClient) Start() error {
285-
// Connect to server
286299
var err error
287-
c.conn, err = net.Dial("tcp", c.server)
300+
if c.useTLS {
301+
c.conn, err = tls.Dial("tcp", c.server, c.tlsConfig)
302+
} else {
303+
c.conn, err = net.Dial("tcp", c.server)
304+
}
288305
if err != nil {
289306
return fmt.Errorf("connecting to server %s: %w", c.server, err)
290307
}

zk/datastream/client/stream_client_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func TestStreamClientReadHeaderEntry(t *testing.T) {
5050
}
5151

5252
for _, testCase := range testCases {
53-
c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0)
53+
c := NewClient(context.Background(), "", false, 0, 500*time.Millisecond, 0)
5454
server, conn := net.Pipe()
5555
defer server.Close()
5656
defer c.Stop()
@@ -118,7 +118,7 @@ func TestStreamClientReadResultEntry(t *testing.T) {
118118
}
119119

120120
for _, testCase := range testCases {
121-
c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0)
121+
c := NewClient(context.Background(), "", false, 0, 500*time.Millisecond, 0)
122122
server, conn := net.Pipe()
123123
defer server.Close()
124124
defer c.Stop()
@@ -191,7 +191,7 @@ func TestStreamClientReadFileEntry(t *testing.T) {
191191
},
192192
}
193193
for _, testCase := range testCases {
194-
c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0)
194+
c := NewClient(context.Background(), "", false, 0, 500*time.Millisecond, 0)
195195
server, conn := net.Pipe()
196196
defer c.Stop()
197197
defer server.Close()
@@ -215,7 +215,7 @@ func TestStreamClientReadFileEntry(t *testing.T) {
215215
}
216216

217217
func TestStreamClientReadParsedProto(t *testing.T) {
218-
c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0)
218+
c := NewClient(context.Background(), "", false, 0, 500*time.Millisecond, 0)
219219
serverConn, clientConn := net.Pipe()
220220
c.conn = clientConn
221221
c.checkTimeout = 1 * time.Second
@@ -287,7 +287,7 @@ func TestStreamClientGetLatestL2Block(t *testing.T) {
287287
clientConn.Close()
288288
}()
289289

290-
c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0)
290+
c := NewClient(context.Background(), "", false, 0, 500*time.Millisecond, 0)
291291
c.conn = clientConn
292292
c.checkTimeout = 1 * time.Second
293293
c.allowStops = false
@@ -401,7 +401,7 @@ func TestStreamClientGetL2BlockByNumber(t *testing.T) {
401401
clientConn.Close()
402402
}()
403403

404-
c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0)
404+
c := NewClient(context.Background(), "", false, 0, 500*time.Millisecond, 0)
405405
c.header = &types.HeaderEntry{
406406
TotalEntries: 4,
407407
}

zk/datastream/test/data_stream_compare/test_datastream_compare.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ func main() {
2525
flag.StringVar(&stream2, "stream2", "", "the second stream to pull data from")
2626
flag.Parse()
2727

28-
client1 := client.NewClient(ctx, stream1, 0, 0, 0)
29-
client2 := client.NewClient(ctx, stream2, 0, 0, 0)
28+
client1 := client.NewClient(ctx, stream1, false, 0, 0, 0)
29+
client2 := client.NewClient(ctx, stream2, false, 0, 0, 0)
3030

3131
err := client1.Start()
3232
if err != nil {

zk/debug_tools/datastream-correctness-check/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func main() {
1919
}
2020

2121
// Create client
22-
client := client.NewClient(ctx, cfg.Datastream, 3, 500, 0)
22+
client := client.NewClient(ctx, cfg.Datastream, false, 3, 500, 0)
2323

2424
// Start client (connect to the server)
2525
defer client.Stop()

zk/stages/stage_batches.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -848,5 +848,5 @@ func getHighestDSL2Block(ctx context.Context, batchCfg BatchesCfg, latestFork ui
848848

849849
func buildNewStreamClient(ctx context.Context, batchesCfg BatchesCfg, latestFork uint16) *client.StreamClient {
850850
cfg := batchesCfg.zkCfg
851-
return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestFork)
851+
return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.L2DataStreamerUseTLS, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestFork)
852852
}

0 commit comments

Comments
 (0)