Skip to content

34 files changed

+1299
-949
lines changed
 

‎.golangci.yml

-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ linters:
66
enable:
77
- asciicheck
88
- bodyclose
9-
- depguard
109
- dogsled
1110
- dupl
1211
- errcheck

‎cmd/cometbft/commands/run_node.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,15 @@ func AddNodeFlags(cmd *cobra.Command) {
9696
"database directory")
9797

9898
cmd.PersistentFlags().String(
99-
trace.FlagInfluxDBURL,
100-
config.Instrumentation.InfluxURL,
101-
trace.FlagInfluxDBURLDescription,
99+
trace.FlagTracePushConfig,
100+
config.Instrumentation.TracePushConfig,
101+
trace.FlagTracePushConfigDescription,
102102
)
103103

104104
cmd.PersistentFlags().String(
105-
trace.FlagInfluxDBToken,
106-
config.Instrumentation.InfluxToken,
107-
trace.FlagInfluxDBTokenDescription,
105+
trace.FlagTracePullAddress,
106+
config.Instrumentation.TracePullAddress,
107+
trace.FlagTracePullAddressDescription,
108108
)
109109

110110
cmd.PersistentFlags().String(

‎config/config.go

+26-29
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ var (
6262
minSubscriptionBufferSize = 100
6363
defaultSubscriptionBufferSize = 200
6464

65-
// DefaultInfluxTables is a list of tables that are used for storing traces.
65+
// DefaultTracingTables is a list of tables that are used for storing traces.
6666
// This global var is filled by an init function in the schema package. This
6767
// allows for the schema package to contain all the relevant logic while
6868
// avoiding import cycles.
69-
DefaultInfluxTables = []string{}
69+
DefaultTracingTables = ""
7070
)
7171

7272
// Config defines the top level configuration for a CometBFT node
@@ -1188,24 +1188,24 @@ type InstrumentationConfig struct {
11881188
// Instrumentation namespace.
11891189
Namespace string `mapstructure:"namespace"`
11901190

1191-
// InfluxURL is the influxdb url.
1192-
InfluxURL string `mapstructure:"influx_url"`
1191+
// TracePushConfig is the relative path of the push config. This second
1192+
// config contains credentials for where and how often to.
1193+
TracePushConfig string `mapstructure:"trace_push_config"`
11931194

1194-
// InfluxToken is the influxdb token.
1195-
InfluxToken string `mapstructure:"influx_token"`
1195+
// TracePullAddress is the address that the trace server will listen on for
1196+
// pulling data.
1197+
TracePullAddress string `mapstructure:"trace_pull_address"`
11961198

1197-
// InfluxOrg is the influxdb organization.
1198-
InfluxOrg string `mapstructure:"influx_org"`
1199+
// TraceType is the type of tracer used. Options are "local" and "noop".
1200+
TraceType string `mapstructure:"trace_type"`
11991201

1200-
// InfluxBucket is the influxdb bucket.
1201-
InfluxBucket string `mapstructure:"influx_bucket"`
1202+
// TraceBufferSize is the number of traces to write in a single batch.
1203+
TraceBufferSize int `mapstructure:"trace_push_batch_size"`
12021204

1203-
// InfluxBatchSize is the number of points to write in a single batch.
1204-
InfluxBatchSize int `mapstructure:"influx_batch_size"`
1205-
1206-
// InfluxTables is the list of tables that will be traced. See the
1207-
// pkg/trace/schema for a complete list of tables.
1208-
InfluxTables []string `mapstructure:"influx_tables"`
1205+
// TracingTables is the list of tables that will be traced. See the
1206+
// pkg/trace/schema for a complete list of tables. It is represented as a
1207+
// comma separate string. For example: "consensus_round_state,mempool_tx".
1208+
TracingTables string `mapstructure:"tracing_tables"`
12091209

12101210
// PyroscopeURL is the pyroscope url used to establish a connection with a
12111211
// pyroscope continuous profiling server.
@@ -1229,11 +1229,11 @@ func DefaultInstrumentationConfig() *InstrumentationConfig {
12291229
PrometheusListenAddr: ":26660",
12301230
MaxOpenConnections: 3,
12311231
Namespace: "cometbft",
1232-
InfluxURL: "",
1233-
InfluxOrg: "celestia",
1234-
InfluxBucket: "e2e",
1235-
InfluxBatchSize: 20,
1236-
InfluxTables: DefaultInfluxTables,
1232+
TracePushConfig: "",
1233+
TracePullAddress: "",
1234+
TraceType: "noop",
1235+
TraceBufferSize: 1000,
1236+
TracingTables: DefaultTracingTables,
12371237
PyroscopeURL: "",
12381238
PyroscopeTrace: false,
12391239
PyroscopeProfileTypes: []string{
@@ -1264,21 +1264,18 @@ func (cfg *InstrumentationConfig) ValidateBasic() error {
12641264
if cfg.PyroscopeTrace && cfg.PyroscopeURL == "" {
12651265
return errors.New("pyroscope_trace can't be enabled if profiling is disabled")
12661266
}
1267-
// if there is not InfluxURL configured, then we do not need to validate the rest
1267+
// if there is not TracePushConfig configured, then we do not need to validate the rest
12681268
// of the config because we are not connecting.
1269-
if cfg.InfluxURL == "" {
1269+
if cfg.TracePushConfig == "" {
12701270
return nil
12711271
}
1272-
if cfg.InfluxToken == "" {
1272+
if cfg.TracePullAddress == "" {
12731273
return fmt.Errorf("token is required")
12741274
}
1275-
if cfg.InfluxOrg == "" {
1275+
if cfg.TraceType == "" {
12761276
return fmt.Errorf("org is required")
12771277
}
1278-
if cfg.InfluxBucket == "" {
1279-
return fmt.Errorf("bucket is required")
1280-
}
1281-
if cfg.InfluxBatchSize <= 0 {
1278+
if cfg.TraceBufferSize <= 0 {
12821279
return fmt.Errorf("batch size must be greater than 0")
12831280
}
12841281
return nil

‎config/toml.go

+14-13
Original file line numberDiff line numberDiff line change
@@ -547,25 +547,26 @@ max_open_connections = {{ .Instrumentation.MaxOpenConnections }}
547547
# Instrumentation namespace
548548
namespace = "{{ .Instrumentation.Namespace }}"
549549
550-
# The URL of the influxdb instance to use for remote event
551-
# collection. If empty, remote event collection is disabled.
552-
influx_url = "{{ .Instrumentation.InfluxURL }}"
550+
# TracePushConfig is the relative path of the push config.
551+
# This second config contains credentials for where and how often to
552+
# push trace data to. For example, if the config is next to this config,
553+
# it would be "push_config.json".
554+
trace_push_config = "{{ .Instrumentation.TracePushConfig }}"
553555
554-
# The influxdb token to use for remote event collection.
555-
influx_token = "{{ .Instrumentation.InfluxToken }}"
556+
# The tracer pull address specifies which address will be used for pull based
557+
# event collection. If empty, the pull based server will not be started.
558+
trace_pull_address = "{{ .Instrumentation.TracePullAddress }}"
556559
557-
# The influxdb bucket to use for remote event collection.
558-
influx_bucket = "{{ .Instrumentation.InfluxBucket }}"
559-
560-
# The influxdb org to use for event remote collection.
561-
influx_org = "{{ .Instrumentation.InfluxOrg }}"
560+
# The tracer to use for collecting trace data.
561+
trace_type = "{{ .Instrumentation.TraceType }}"
562562
563563
# The size of the batches that are sent to the database.
564-
influx_batch_size = {{ .Instrumentation.InfluxBatchSize }}
564+
trace_push_batch_size = {{ .Instrumentation.TraceBufferSize }}
565565
566566
# The list of tables that are updated when tracing. All available tables and
567-
# their schema can be found in the pkg/trace/schema package.
568-
influx_tables = [{{ range .Instrumentation.InfluxTables }}{{ printf "%q, " . }}{{end}}]
567+
# their schema can be found in the pkg/trace/schema package. It is represented as a
568+
# comma separate string. For example: "consensus_round_state,mempool_tx".
569+
tracing_tables = "{{ .Instrumentation.TracingTables }}"
569570
570571
# The URL of the pyroscope instance to use for continuous profiling.
571572
# If empty, continuous profiling is disabled.

‎consensus/reactor.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type Reactor struct {
5050
rs *cstypes.RoundState
5151

5252
Metrics *Metrics
53-
traceClient *trace.Client
53+
traceClient trace.Tracer
5454
}
5555

5656
type ReactorOption func(*Reactor)
@@ -63,7 +63,7 @@ func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption)
6363
waitSync: waitSync,
6464
rs: consensusState.GetRoundState(),
6565
Metrics: NopMetrics(),
66-
traceClient: &trace.Client{},
66+
traceClient: trace.NoOpTracer(),
6767
}
6868
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)
6969

@@ -338,7 +338,7 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
338338
case *BlockPartMessage:
339339
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
340340
conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1)
341-
schema.WriteBlockPart(conR.traceClient, msg.Height, msg.Round, e.Src.ID(), msg.Part.Index, schema.TransferTypeDownload)
341+
schema.WriteBlockPart(conR.traceClient, msg.Height, msg.Round, e.Src.ID(), msg.Part.Index, schema.Download)
342342
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
343343
default:
344344
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
@@ -357,7 +357,7 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
357357
cs.Validators.Size(), cs.LastCommit.Size()
358358
cs.mtx.RUnlock()
359359

360-
schema.WriteVote(conR.traceClient, height, round, msg.Vote, e.Src.ID(), schema.TransferTypeDownload)
360+
schema.WriteVote(conR.traceClient, height, round, msg.Vote, e.Src.ID(), schema.Download)
361361

362362
ps.EnsureVoteBitArrays(height, valSize)
363363
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
@@ -599,7 +599,7 @@ OUTER_LOOP:
599599
Part: *parts,
600600
},
601601
}, logger) {
602-
schema.WriteBlockPart(conR.traceClient, rs.Height, rs.Round, peer.ID(), part.Index, schema.TransferTypeUpload)
602+
schema.WriteBlockPart(conR.traceClient, rs.Height, rs.Round, peer.ID(), part.Index, schema.Upload)
603603
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
604604
}
605605
continue OUTER_LOOP
@@ -783,7 +783,7 @@ OUTER_LOOP:
783783
if vote != nil {
784784
logger.Debug("Picked Catchup commit to send", "height", prs.Height)
785785
schema.WriteVote(conR.traceClient, rs.Height, rs.Round, vote,
786-
ps.peer.ID(), schema.TransferTypeUpload)
786+
ps.peer.ID(), schema.Upload)
787787
continue OUTER_LOOP
788788
}
789789
}
@@ -812,7 +812,7 @@ func (conR *Reactor) pickSendVoteAndTrace(votes types.VoteSetReader, rs *cstypes
812812
vote := ps.PickSendVote(votes)
813813
if vote != nil { // if a vote is sent, trace it
814814
schema.WriteVote(conR.traceClient, rs.Height, rs.Round, vote,
815-
ps.peer.ID(), schema.TransferTypeUpload)
815+
ps.peer.ID(), schema.Upload)
816816
return true
817817
}
818818
return false
@@ -1046,7 +1046,7 @@ func ReactorMetrics(metrics *Metrics) ReactorOption {
10461046
return func(conR *Reactor) { conR.Metrics = metrics }
10471047
}
10481048

1049-
func ReactorTracing(traceClient *trace.Client) ReactorOption {
1049+
func ReactorTracing(traceClient trace.Tracer) ReactorOption {
10501050
return func(conR *Reactor) { conR.traceClient = traceClient }
10511051
}
10521052

‎consensus/state.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ type State struct {
143143
// for reporting metrics
144144
metrics *Metrics
145145

146-
traceClient *trace.Client
146+
traceClient trace.Tracer
147147
}
148148

149149
// StateOption sets an optional parameter on the State.
@@ -174,7 +174,7 @@ func NewState(
174174
evpool: evpool,
175175
evsw: cmtevents.NewEventSwitch(),
176176
metrics: NopMetrics(),
177-
traceClient: &trace.Client{},
177+
traceClient: trace.NoOpTracer(),
178178
}
179179

180180
// set function defaults (may be overwritten before calling Start)
@@ -217,7 +217,7 @@ func StateMetrics(metrics *Metrics) StateOption {
217217
}
218218

219219
// SetTraceClient sets the remote event collector.
220-
func SetTraceClient(ec *trace.Client) StateOption {
220+
func SetTraceClient(ec trace.Tracer) StateOption {
221221
return func(cs *State) { cs.traceClient = ec }
222222
}
223223

@@ -1845,7 +1845,7 @@ func (cs *State) recordMetrics(height int64, block *types.Block) {
18451845
blockSize := block.Size()
18461846

18471847
// trace some metadata about the block
1848-
schema.WriteBlock(cs.traceClient, block, blockSize)
1848+
schema.WriteBlockSummary(cs.traceClient, block, blockSize)
18491849

18501850
cs.metrics.NumTxs.Set(float64(len(block.Data.Txs)))
18511851
cs.metrics.TotalTxs.Add(float64(len(block.Data.Txs)))

‎go.mod

+26-25
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/Masterminds/semver/v3 v3.2.0
99
github.com/Workiva/go-datastructures v1.0.53
1010
github.com/adlio/schema v1.3.3
11+
github.com/aws/aws-sdk-go v1.40.45
1112
github.com/btcsuite/btcd/btcec/v2 v2.3.2
1213
github.com/btcsuite/btcd/btcutil v1.1.3
1314
github.com/bufbuild/buf v1.15.1
@@ -24,12 +25,11 @@ require (
2425
github.com/golang/protobuf v1.5.3
2526
github.com/golangci/golangci-lint v1.52.0
2627
github.com/google/orderedcode v0.0.1
27-
github.com/google/uuid v1.3.1
28+
github.com/google/uuid v1.4.0
2829
github.com/gorilla/websocket v1.5.0
2930
github.com/grafana/otel-profiling-go v0.5.1
3031
github.com/grafana/pyroscope-go v1.1.1
3132
github.com/gtank/merlin v0.1.1
32-
github.com/influxdata/influxdb-client-go/v2 v2.12.2
3333
github.com/informalsystems/tm-load-test v1.3.0
3434
github.com/lib/pq v1.10.7
3535
github.com/libp2p/go-buffer-pool v0.1.0
@@ -41,8 +41,8 @@ require (
4141
github.com/rs/cors v1.8.3
4242
github.com/sasha-s/go-deadlock v0.3.1
4343
github.com/snikch/goodman v0.0.0-20171125024755-10e37e294daa
44-
github.com/spf13/cobra v1.6.1
45-
github.com/spf13/viper v1.15.0
44+
github.com/spf13/cobra v1.8.0
45+
github.com/spf13/viper v1.18.1
4646
github.com/stretchr/testify v1.8.4
4747
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
4848
github.com/vektra/mockery/v2 v2.23.1
@@ -51,8 +51,8 @@ require (
5151
go.opentelemetry.io/otel/sdk v1.21.0
5252
golang.org/x/crypto v0.21.0
5353
golang.org/x/net v0.23.0
54-
gonum.org/v1/gonum v0.8.2
55-
google.golang.org/grpc v1.59.0
54+
gonum.org/v1/gonum v0.12.0
55+
google.golang.org/grpc v1.60.0
5656
google.golang.org/protobuf v1.31.0
5757
)
5858

@@ -90,16 +90,15 @@ require (
9090
github.com/charithe/durationcheck v0.0.10 // indirect
9191
github.com/chavacava/garif v0.0.0-20230227094218-b8c73b2037b8 // indirect
9292
github.com/chigopher/pathlib v0.12.0 // indirect
93-
github.com/cloudflare/circl v1.3.3 // indirect
93+
github.com/cloudflare/circl v1.3.7 // indirect
9494
github.com/containerd/continuity v0.3.0 // indirect
9595
github.com/cosmos/go-bip39 v0.0.0-20180819234021-555e2067c45d // indirect
96-
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
96+
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
9797
github.com/curioswitch/go-reassign v0.2.0 // indirect
9898
github.com/cyphar/filepath-securejoin v0.2.4 // indirect
9999
github.com/daixiang0/gci v0.10.1 // indirect
100-
github.com/davecgh/go-spew v1.1.1 // indirect
100+
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
101101
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
102-
github.com/deepmap/oapi-codegen v1.8.2 // indirect
103102
github.com/denis-tingaikin/go-header v0.4.3 // indirect
104103
github.com/dgraph-io/badger/v2 v2.2007.4 // indirect
105104
github.com/dgraph-io/ristretto v0.1.1 // indirect
@@ -121,7 +120,7 @@ require (
121120
github.com/fatih/structtag v1.2.0 // indirect
122121
github.com/felixge/fgprof v0.9.3 // indirect
123122
github.com/firefart/nonamedreturns v1.0.4 // indirect
124-
github.com/fsnotify/fsnotify v1.6.0 // indirect
123+
github.com/fsnotify/fsnotify v1.7.0 // indirect
125124
github.com/fzipp/gocyclo v0.6.0 // indirect
126125
github.com/go-chi/chi/v5 v5.0.8 // indirect
127126
github.com/go-critic/go-critic v0.7.0 // indirect
@@ -171,13 +170,13 @@ require (
171170
github.com/hexops/gotextdiff v1.0.3 // indirect
172171
github.com/iancoleman/strcase v0.2.0 // indirect
173172
github.com/inconshreveable/mousetrap v1.1.0 // indirect
174-
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect
175173
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
176174
github.com/jdxcode/netrc v0.0.0-20221124155335-4616370d1a84 // indirect
177175
github.com/jgautheron/goconst v1.5.1 // indirect
178176
github.com/jingyugao/rowserrcheck v1.1.1 // indirect
179177
github.com/jinzhu/copier v0.3.5 // indirect
180178
github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect
179+
github.com/jmespath/go-jmespath v0.4.0 // indirect
181180
github.com/jmhodges/levigo v1.0.0 // indirect
182181
github.com/julz/importas v0.1.0 // indirect
183182
github.com/junk1tm/musttag v0.5.0 // indirect
@@ -219,24 +218,26 @@ require (
219218
github.com/opencontainers/go-digest v1.0.0 // indirect
220219
github.com/opencontainers/image-spec v1.1.0-rc2 // indirect
221220
github.com/opencontainers/runc v1.1.3 // indirect
222-
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
221+
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
223222
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect
224223
github.com/pjbgf/sha1cd v0.3.0 // indirect
225224
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
226225
github.com/pkg/profile v1.7.0 // indirect
227-
github.com/pmezard/go-difflib v1.0.0 // indirect
228-
github.com/polyfloyd/go-errorlint v1.4.0 // indirect
226+
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
227+
github.com/polyfloyd/go-errorlint v1.4.5 // indirect
229228
github.com/prometheus/client_model v0.3.0 // indirect
230229
github.com/prometheus/common v0.42.0 // indirect
231-
github.com/prometheus/procfs v0.8.0 // indirect
232-
github.com/quasilyte/go-ruleguard v0.3.19 // indirect
230+
github.com/prometheus/procfs v0.12.0 // indirect
231+
github.com/quasilyte/go-ruleguard v0.4.0 // indirect
233232
github.com/quasilyte/gogrep v0.5.0 // indirect
234233
github.com/quasilyte/regex/syntax v0.0.0-20210819130434-b3f0c404a727 // indirect
235234
github.com/quasilyte/stdinfo v0.0.0-20220114132959-f7386bf02567 // indirect
236235
github.com/rs/zerolog v1.29.0 // indirect
237236
github.com/russross/blackfriday/v2 v2.1.0 // indirect
238237
github.com/ryancurrah/gomodguard v1.3.0 // indirect
239238
github.com/ryanrolds/sqlclosecheck v0.4.0 // indirect
239+
github.com/sagikazarmark/locafero v0.4.0 // indirect
240+
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
240241
github.com/sanposhiho/wastedassign/v2 v2.0.7 // indirect
241242
github.com/sashamelentyev/interfacebloat v1.1.0 // indirect
242243
github.com/sashamelentyev/usestdlibvars v1.23.0 // indirect
@@ -250,15 +251,15 @@ require (
250251
github.com/sivchari/tenv v1.7.1 // indirect
251252
github.com/skeema/knownhosts v1.2.1 // indirect
252253
github.com/sonatard/noctx v0.0.2 // indirect
254+
github.com/sourcegraph/conc v0.3.0 // indirect
253255
github.com/sourcegraph/go-diff v0.7.0 // indirect
254-
github.com/spf13/afero v1.9.3 // indirect
255-
github.com/spf13/cast v1.5.0 // indirect
256-
github.com/spf13/jwalterweatherman v1.1.0 // indirect
256+
github.com/spf13/afero v1.11.0 // indirect
257+
github.com/spf13/cast v1.6.0 // indirect
257258
github.com/spf13/pflag v1.0.5 // indirect
258259
github.com/ssgreg/nlreturn/v2 v2.2.1 // indirect
259260
github.com/stbenjam/no-sprintf-host-port v0.1.1 // indirect
260261
github.com/stretchr/objx v0.5.0 // indirect
261-
github.com/subosito/gotenv v1.4.2 // indirect
262+
github.com/subosito/gotenv v1.6.0 // indirect
262263
github.com/t-yuki/gocover-cobertura v0.0.0-20180217150009-aaee18c8195c // indirect
263264
github.com/tdakkota/asciicheck v0.2.0 // indirect
264265
github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c // indirect
@@ -280,15 +281,15 @@ require (
280281
go.uber.org/atomic v1.10.0 // indirect
281282
go.uber.org/multierr v1.10.0 // indirect
282283
go.uber.org/zap v1.24.0 // indirect
283-
golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb // indirect
284-
golang.org/x/exp/typeparams v0.0.0-20230224173230-c95f2b4c22f2 // indirect
284+
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
285+
golang.org/x/exp/typeparams v0.0.0-20230307190834-24139beb5833 // indirect
285286
golang.org/x/mod v0.12.0 // indirect
286-
golang.org/x/sync v0.3.0 // indirect
287+
golang.org/x/sync v0.5.0 // indirect
287288
golang.org/x/sys v0.18.0 // indirect
288289
golang.org/x/term v0.18.0 // indirect
289290
golang.org/x/text v0.14.0 // indirect
290291
golang.org/x/tools v0.13.0 // indirect
291-
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
292+
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect
292293
gopkg.in/ini.v1 v1.67.0 // indirect
293294
gopkg.in/warnings.v0 v0.1.2 // indirect
294295
gopkg.in/yaml.v2 v2.4.0 // indirect

‎go.sum

+60-157
Large diffs are not rendered by default.

‎mempool/cat/reactor.go

+24-27
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type Reactor struct {
4141
mempool *TxPool
4242
ids *mempoolIDs
4343
requests *requestScheduler
44-
traceClient *trace.Client
44+
traceClient trace.Tracer
4545
}
4646

4747
type ReactorOptions struct {
@@ -57,7 +57,7 @@ type ReactorOptions struct {
5757
MaxGossipDelay time.Duration
5858

5959
// TraceClient is the trace client for collecting trace level events
60-
TraceClient *trace.Client
60+
TraceClient trace.Tracer
6161
}
6262

6363
func (opts *ReactorOptions) VerifyAndComplete() error {
@@ -91,7 +91,7 @@ func NewReactor(mempool *TxPool, opts *ReactorOptions) (*Reactor, error) {
9191
mempool: mempool,
9292
ids: newMempoolIDs(),
9393
requests: newRequestScheduler(opts.MaxGossipDelay, defaultGlobalRequestTimeout),
94-
traceClient: &trace.Client{},
94+
traceClient: trace.NoOpTracer(),
9595
}
9696
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
9797
return memR, nil
@@ -228,9 +228,6 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
228228
// NOTE: This setup also means that we can support older mempool implementations that simply
229229
// flooded the network with transactions.
230230
case *protomem.Txs:
231-
for _, tx := range msg.Txs {
232-
schema.WriteMempoolTx(memR.traceClient, e.Src.ID(), tx, schema.TransferTypeDownload, schema.CatVersionFieldValue)
233-
}
234231
protoTxs := msg.GetTxs()
235232
if len(protoTxs) == 0 {
236233
memR.Logger.Error("received empty txs from peer", "src", e.Src)
@@ -244,6 +241,7 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
244241
for _, tx := range protoTxs {
245242
ntx := types.Tx(tx)
246243
key := ntx.Key()
244+
schema.WriteMempoolTx(memR.traceClient, e.Src.ID(), key[:], schema.Download)
247245
// If we requested the transaction we mark it as received.
248246
if memR.requests.Has(peerID, key) {
249247
memR.requests.MarkReceived(peerID, key)
@@ -273,19 +271,19 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
273271
// 3. If we recently evicted the tx and still don't have space for it, we do nothing.
274272
// 4. Else, we request the transaction from that peer.
275273
case *protomem.SeenTx:
276-
schema.WriteMempoolPeerState(
277-
memR.traceClient,
278-
e.Src.ID(),
279-
schema.SeenTxStateUpdateFieldValue,
280-
schema.TransferTypeDownload,
281-
schema.CatVersionFieldValue,
282-
)
283274
txKey, err := types.TxKeyFromBytes(msg.TxKey)
284275
if err != nil {
285276
memR.Logger.Error("peer sent SeenTx with incorrect tx key", "err", err)
286277
memR.Switch.StopPeerForError(e.Src, err)
287278
return
288279
}
280+
schema.WriteMempoolPeerState(
281+
memR.traceClient,
282+
e.Src.ID(),
283+
schema.SeenTx,
284+
txKey[:],
285+
schema.Download,
286+
)
289287
peerID := memR.ids.GetIDForPeer(e.Src.ID())
290288
memR.mempool.PeerHasTx(peerID, txKey)
291289
// Check if we don't already have the transaction and that it was recently rejected
@@ -307,35 +305,34 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
307305
// A peer is requesting a transaction that we have claimed to have. Find the specified
308306
// transaction and broadcast it to the peer. We may no longer have the transaction
309307
case *protomem.WantTx:
310-
schema.WriteMempoolPeerState(
311-
memR.traceClient,
312-
e.Src.ID(),
313-
schema.WantTxStateUpdateFieldValue,
314-
schema.TransferTypeDownload,
315-
schema.CatVersionFieldValue,
316-
)
317308
txKey, err := types.TxKeyFromBytes(msg.TxKey)
318309
if err != nil {
319310
memR.Logger.Error("peer sent WantTx with incorrect tx key", "err", err)
320311
memR.Switch.StopPeerForError(e.Src, err)
321312
return
322313
}
314+
schema.WriteMempoolPeerState(
315+
memR.traceClient,
316+
e.Src.ID(),
317+
schema.WantTx,
318+
txKey[:],
319+
schema.Download,
320+
)
323321
tx, has := memR.mempool.Get(txKey)
324322
if has && !memR.opts.ListenOnly {
325323
peerID := memR.ids.GetIDForPeer(e.Src.ID())
326-
schema.WriteMempoolTx(
327-
memR.traceClient,
328-
e.Src.ID(),
329-
msg.TxKey,
330-
schema.TransferTypeUpload,
331-
schema.CatVersionFieldValue,
332-
)
333324
memR.Logger.Debug("sending a tx in response to a want msg", "peer", peerID)
334325
if p2p.SendEnvelopeShim(e.Src, p2p.Envelope{ //nolint:staticcheck
335326
ChannelID: mempool.MempoolChannel,
336327
Message: &protomem.Txs{Txs: [][]byte{tx}},
337328
}, memR.Logger) {
338329
memR.mempool.PeerHasTx(peerID, txKey)
330+
schema.WriteMempoolTx(
331+
memR.traceClient,
332+
e.Src.ID(),
333+
txKey[:],
334+
schema.Upload,
335+
)
339336
}
340337
}
341338

‎mempool/v1/mempool.go

+3-17
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"github.com/tendermint/tendermint/libs/log"
1717
"github.com/tendermint/tendermint/mempool"
1818
"github.com/tendermint/tendermint/pkg/trace"
19-
"github.com/tendermint/tendermint/pkg/trace/schema"
2019
"github.com/tendermint/tendermint/proxy"
2120
"github.com/tendermint/tendermint/types"
2221
)
@@ -59,7 +58,7 @@ type TxMempool struct {
5958
txByKey map[types.TxKey]*clist.CElement
6059
txBySender map[string]*clist.CElement // for sender != ""
6160

62-
traceClient *trace.Client
61+
traceClient trace.Tracer
6362
}
6463

6564
// NewTxMempool constructs a new, empty priority mempool at the specified
@@ -83,7 +82,7 @@ func NewTxMempool(
8382
height: height,
8483
txByKey: make(map[types.TxKey]*clist.CElement),
8584
txBySender: make(map[string]*clist.CElement),
86-
traceClient: &trace.Client{},
85+
traceClient: trace.NoOpTracer(),
8786
}
8887
if cfg.CacheSize > 0 {
8988
txmp.cache = mempool.NewLRUTxCache(cfg.CacheSize)
@@ -115,7 +114,7 @@ func WithMetrics(metrics *mempool.Metrics) TxMempoolOption {
115114
return func(txmp *TxMempool) { txmp.metrics = metrics }
116115
}
117116

118-
func WithTraceClient(tc *trace.Client) TxMempoolOption {
117+
func WithTraceClient(tc trace.Tracer) TxMempoolOption {
119118
return func(txmp *TxMempool) {
120119
txmp.traceClient = tc
121120
}
@@ -204,7 +203,6 @@ func (txmp *TxMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo memp
204203
if txmp.preCheck != nil {
205204
if err := txmp.preCheck(tx); err != nil {
206205
txmp.metrics.FailedTxs.With(mempool.TypeLabel, mempool.FailedPrecheck).Add(1)
207-
schema.WriteMempoolRejected(txmp.traceClient, err.Error())
208206
return 0, mempool.ErrPreCheck{Reason: err}
209207
}
210208
}
@@ -483,15 +481,6 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.Respon
483481
)
484482

485483
txmp.metrics.FailedTxs.With(mempool.TypeLabel, mempool.FailedAdding).Add(1)
486-
reason := fmt.Sprintf(
487-
"code: %d codespace: %s logs: %s local: %v postCheck error: %v",
488-
checkTxRes.Code,
489-
checkTxRes.Codespace,
490-
checkTxRes.Log,
491-
wtx.HasPeer(0), // this checks if the peer id is local
492-
err,
493-
)
494-
schema.WriteMempoolRejected(txmp.traceClient, reason)
495484

496485
// Remove the invalid transaction from the cache, unless the operator has
497486
// instructed us to keep invalid transactions.
@@ -672,9 +661,6 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.Respons
672661
txmp.metrics.FailedTxs.With(mempool.TypeLabel, mempool.FailedRecheck).Add(1)
673662
if !txmp.config.KeepInvalidTxsInCache {
674663
txmp.cache.Remove(wtx.tx)
675-
if err != nil {
676-
schema.WriteMempoolRejected(txmp.traceClient, err.Error())
677-
}
678664
}
679665
txmp.metrics.Size.Set(float64(txmp.Size()))
680666
txmp.metrics.SizeBytes.Set(float64(txmp.SizeBytes()))

‎mempool/v1/reactor.go

+14-18
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type Reactor struct {
2727
config *cfg.MempoolConfig
2828
mempool *TxMempool
2929
ids *mempoolIDs
30-
traceClient *trace.Client
30+
traceClient trace.Tracer
3131
}
3232

3333
type mempoolIDs struct {
@@ -94,7 +94,7 @@ func newMempoolIDs() *mempoolIDs {
9494
}
9595

9696
// NewReactor returns a new Reactor with the given config and mempool.
97-
func NewReactor(config *cfg.MempoolConfig, mempool *TxMempool, traceClient *trace.Client) *Reactor {
97+
func NewReactor(config *cfg.MempoolConfig, mempool *TxMempool, traceClient trace.Tracer) *Reactor {
9898
memR := &Reactor{
9999
config: config,
100100
mempool: mempool,
@@ -180,15 +180,6 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
180180
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
181181
switch msg := e.Message.(type) {
182182
case *protomem.Txs:
183-
for _, tx := range msg.Txs {
184-
schema.WriteMempoolTx(
185-
memR.traceClient,
186-
e.Src.ID(),
187-
tx,
188-
schema.TransferTypeDownload,
189-
schema.V1VersionFieldValue,
190-
)
191-
}
192183
protoTxs := msg.GetTxs()
193184
if len(protoTxs) == 0 {
194185
memR.Logger.Error("received tmpty txs from peer", "src", e.Src)
@@ -202,6 +193,12 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
202193
var err error
203194
for _, tx := range protoTxs {
204195
ntx := types.Tx(tx)
196+
schema.WriteMempoolTx(
197+
memR.traceClient,
198+
e.Src.ID(),
199+
ntx.Hash(),
200+
schema.Download,
201+
)
205202
err = memR.mempool.CheckTx(ntx, nil, txInfo)
206203
if errors.Is(err, mempool.ErrTxInCache) {
207204
memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String())
@@ -302,14 +299,13 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
302299
// record that we have sent the peer the transaction
303300
// to avoid doing it a second time
304301
memTx.SetPeer(peerID)
302+
schema.WriteMempoolTx(
303+
memR.traceClient,
304+
peer.ID(),
305+
memTx.tx.Hash(),
306+
schema.Upload,
307+
)
305308
}
306-
schema.WriteMempoolTx(
307-
memR.traceClient,
308-
peer.ID(),
309-
memTx.tx,
310-
schema.TransferTypeUpload,
311-
schema.V1VersionFieldValue,
312-
)
313309
}
314310

315311
select {

‎mempool/v1/reactor_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor {
164164
mempool, cleanup := newMempoolWithAppAndConfig(cc, config)
165165
defer cleanup()
166166

167-
reactors[i] = NewReactor(config.Mempool, mempool, &trace.Client{}) // so we dont start the consensus states
167+
reactors[i] = NewReactor(config.Mempool, mempool, trace.NoOpTracer()) // so we dont start the consensus states
168168
reactors[i].SetLogger(logger.With("validator", i))
169169
}
170170

‎node/node.go

+11-13
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ type Node struct {
234234
blockIndexer indexer.BlockIndexer
235235
indexerService *txindex.IndexerService
236236
prometheusSrv *http.Server
237-
influxDBClient *trace.Client
237+
tracer trace.Tracer
238238
pyroscopeProfiler *pyroscope.Profiler
239239
pyroscopeTracer *sdktrace.TracerProvider
240240
}
@@ -378,7 +378,7 @@ func createMempoolAndMempoolReactor(
378378
state sm.State,
379379
memplMetrics *mempl.Metrics,
380380
logger log.Logger,
381-
traceClient *trace.Client,
381+
traceClient trace.Tracer,
382382
) (mempl.Mempool, p2p.Reactor) {
383383
switch config.Mempool.Version {
384384
case cfg.MempoolV2:
@@ -515,7 +515,7 @@ func createConsensusReactor(config *cfg.Config,
515515
waitSync bool,
516516
eventBus *types.EventBus,
517517
consensusLogger log.Logger,
518-
traceClient *trace.Client,
518+
traceClient trace.Tracer,
519519
) (*cs.Reactor, *cs.State) {
520520
consensusState := cs.NewState(
521521
config.Consensus,
@@ -856,11 +856,9 @@ func NewNode(config *cfg.Config,
856856

857857
csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID, softwareVersion)
858858

859-
// create an optional influxdb client to send arbitrary data to a remote
860-
// influxdb server. This is used to collect trace data from many different nodes
861-
// in a network.
862-
influxdbClient, err := trace.NewClient(
863-
config.Instrumentation,
859+
// create an optional tracer client to collect trace data.
860+
tracer, err := trace.NewTracer(
861+
config,
864862
logger,
865863
genDoc.ChainID,
866864
string(nodeKey.ID()),
@@ -870,7 +868,7 @@ func NewNode(config *cfg.Config,
870868
}
871869

872870
// Make MempoolReactor
873-
mempool, mempoolReactor := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger, influxdbClient)
871+
mempool, mempoolReactor := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger, tracer)
874872

875873
// Make Evidence Reactor
876874
evidenceReactor, evidencePool, err := createEvidenceReactor(config, dbProvider, stateDB, blockStore, logger)
@@ -903,7 +901,7 @@ func NewNode(config *cfg.Config,
903901
}
904902
consensusReactor, consensusState := createConsensusReactor(
905903
config, state, blockExec, blockStore, mempool, evidencePool,
906-
privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, influxdbClient,
904+
privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, tracer,
907905
)
908906

909907
// Set up state sync reactor, and schedule a sync if requested.
@@ -1001,7 +999,7 @@ func NewNode(config *cfg.Config,
1001999
indexerService: indexerService,
10021000
blockIndexer: blockIndexer,
10031001
eventBus: eventBus,
1004-
influxDBClient: influxdbClient,
1002+
tracer: tracer,
10051003
}
10061004
node.BaseService = *service.NewBaseService(logger, "Node", node)
10071005

@@ -1150,8 +1148,8 @@ func (n *Node) OnStop() {
11501148
}
11511149
}
11521150

1153-
if n.influxDBClient != nil {
1154-
n.influxDBClient.Stop()
1151+
if n.tracer != nil {
1152+
n.tracer.Stop()
11551153
}
11561154

11571155
if n.pyroscopeProfiler != nil {

‎pkg/trace/README.md

+59-109
Original file line numberDiff line numberDiff line change
@@ -1,136 +1,86 @@
1-
# trace: push arbitrary trace level data to an influxdb instance
1+
# trace package
22

3-
This package has code to create a client that can be used to push events to an
4-
influxdb instance. It is used to collect trace data from many different nodes in
5-
a network. If there is no URL in the config.toml, then the underlying client is
6-
nil and no points will be written. The provided chainID and nodeID are used to
7-
tag all points. The underlying client is exposed to allow for custom writes, but
8-
the WritePoint method should be used for most cases, as it enforces the schema.
3+
The `trace` package provides a decently fast way to store traces locally.
94

10-
## Usage and Schema
5+
## Usage
116

12-
To use this package, first create a new client using the `NewClient` function,
13-
then pass that client to the relevant components that need to push events. After
14-
that, you can use the `WritePoint` method to push events to influxdb. In the below
15-
example, we're pushing a point in the consensus reactor to measure exactly when
16-
each step of consensus is reached for each node.
7+
To enable the local tracer, add the following to the config.toml file:
178

18-
```go
19-
client.WritePoint(RoundStateTable, map[string]interface{}{
20-
HeightFieldKey: height,
21-
RoundFieldKey: round,
22-
StepFieldKey: step.String(),
23-
})
24-
```
25-
26-
Using this method enforces the typical schema, where we are tagging (aka
27-
indexing) each point by the chain-id and the node-id, then adding the local time
28-
of the creation of the event. If you need to push a custom point, you can use
29-
the underlying client directly. See `influxdb2.WriteAPI` for more details.
30-
31-
### Schema
32-
33-
All points in influxdb are divided into a key value pair per field. These kvs
34-
are indexed first by a "measurement", which is used as a "table" in other dbs.
35-
Additional indexes can also be added, we're using the chain-id and node-id here.
36-
This allows for us to quickly query for trace data for a specific chain and/or
37-
node.
38-
39-
```flux
40-
from(bucket: "e2e")
41-
|> range(start: -1h)
42-
|> filter(
43-
fn: (r) => r["_measurement"] == "consensus_round_state"
44-
and r.chain_id == "ci-YREG8X"
45-
and r.node_id == "0b529c309608172a29c49979394734260b42acfb"
46-
)
47-
```
9+
```toml
10+
# The tracer to use for collecting trace data.
11+
trace_type = "local"
4812

49-
We can easily retrieve all fields in a relatively standard table format by using
50-
the pivot `fluxQL` command.
13+
# The size of the batches that are sent to the database.
14+
trace_push_batch_size = 1000
5115

52-
```flux
53-
from(bucket: "mocha")
54-
|> range(start: -1h)
55-
|> filter(fn: (r) => r._measurement == "consensus_round_state")
56-
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
16+
# The list of tables that are updated when tracing. All available tables and
17+
# their schema can be found in the pkg/trace/schema package. It is represented as a
18+
# comma separate string. For example: "consensus_round_state,mempool_tx".
19+
tracing_tables = "consensus_round_state,mempool_tx"
5720
```
5821

59-
### Querying Data Using Python
60-
61-
Python can be used to quickly search for and isolate specific patterns.
62-
63-
```python
64-
from influxdb_client import InfluxDBClient
65-
from influxdb_client.client.write_api import SYNCHRONOUS
22+
Trace data will now be stored to the `.celestia-app/data/traces` directory, and
23+
save the file to the specified directory in the `table_name.jsonl` format.
6624

67-
client = InfluxDBClient(url="http://your-influx-url:8086/", token="your-influx-token", org="celestia")
25+
To read the contents of the file, open it and pass it the Decode function. This
26+
returns all of the events in that file as a slice.
6827

69-
query_api = client.query_api()
70-
71-
def create_flux_table_query(start, bucket, measurement, filter_clause):
72-
flux_table_query = f'''
73-
from(bucket: "{bucket}")
74-
|> range(start: {start})
75-
|> filter(fn: (r) => r._measurement == "{measurement}")
76-
{filter_clause}
77-
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
78-
'''
79-
return flux_table_query
80-
81-
query = create_flux_table_query("-1h", "mocha", "consenus_round_state", "")
82-
result = query_api.query(query=query)
28+
```go
29+
events, err := DecodeFile[schema.MempoolTx](file)
30+
if err != nil {
31+
return err
32+
}
8333
```
8434

85-
### Running a node with remote tracing on
35+
### Pull Based Event Collection
8636

87-
Tracing will only occur if an influxdb URL in specified either directly in the
88-
`config.toml` or as flags provided to the start sub command.
37+
Pull based event collection is where external servers connect to and pull trace
38+
data from the consensus node.
8939

90-
#### Configure in the `config.toml`
40+
To use this, change the config.toml to store traces in the
41+
.celestia-app/data/traces directory.
9142

9243
```toml
93-
#######################################################
94-
### Instrumentation Configuration Options ###
95-
#######################################################
96-
[instrumentation]
97-
98-
...
44+
# The tracer pull address specifies which address will be used for pull based
45+
# event collection. If empty, the pull based server will not be started.
46+
trace_pull_address = ":26661"
47+
```
9948

100-
# The URL of the influxdb instance to use for remote event
101-
# collection. If empty, remote event collection is disabled.
102-
influx_url = "http://your-influx-ip:8086/"
49+
To retrieve a table remotely using the pull based server, call the following
50+
function:
10351

104-
# The influxdb token to use for remote event collection.
105-
influx_token = "your-token"
52+
```go
53+
err := GetTable("http://1.2.3.4:26661", "mempool_tx", "directory to store the file")
54+
if err != nil {
55+
return err
56+
}
57+
```
10658

107-
# The influxdb bucket to use for remote event collection.
108-
influx_bucket = "e2e"
59+
This stores the data locally in the specified directory.
10960

110-
# The influxdb org to use for event remote collection.
111-
influx_org = "celestia"
11261

113-
# The size of the batches that are sent to the database.
114-
influx_batch_size = 20
62+
### Push Based Event Collection
11563

116-
# The list of tables that are updated when tracing. All available tables and
117-
# their schema can be found in the pkg/trace/schema package.
118-
influx_tables = ["consensus_round_state", "mempool_tx", ]
64+
Push based event collection is where the consensus node pushes trace data to an
65+
external server. At the moment, this is just an S3 bucket. To use this, add the
66+
following to the config.toml file:
11967

68+
```toml
69+
# TracePushConfig is the relative path of the push config.
70+
# This second config contains credentials for where and how often to
71+
# push trace data to. For example, if the config is next to this config,
72+
# it would be "push_config.json".
73+
trace_push_config = "{{ .Instrumentation.TracePushConfig }}"
12074
```
12175

122-
or
123-
124-
```sh
125-
celestia-appd start --influxdb-url=http://your-influx-ip:8086/ --influxdb-token="your-token"
126-
```
127-
128-
### e2e tests
129-
130-
To push events from e2e tests, we only need to specify the URL and the token via
131-
the cli.
76+
The push config file should look like this:
13277

133-
```bash
134-
cd test/e2e
135-
make && ./build/runner -f ./networks/ci.toml --influxdb-url=http://your-influx-ip:8086/ --influxdb-token="your-token"
78+
```json
79+
{
80+
"bucket": "bucket-name",
81+
"region": "region",
82+
"access_key": "",
83+
"secret_key": "",
84+
"push_delay": 60 // number of seconds to wait between intervals of pushing all files
85+
}
13686
```

‎pkg/trace/buffered_file.go

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package trace
2+
3+
import (
4+
"bufio"
5+
"os"
6+
"sync"
7+
)
8+
9+
type bufferedFile struct {
10+
mut *sync.RWMutex
11+
// file is the file that is being written to.
12+
file *os.File
13+
// writer is the buffered writer that is writing to the file.
14+
wr *bufio.Writer
15+
}
16+
17+
// newbufferedFile creates a new buffered file that writes to the given file.
18+
func newbufferedFile(file *os.File) *bufferedFile {
19+
return &bufferedFile{
20+
mut: &sync.RWMutex{},
21+
file: file,
22+
wr: bufio.NewWriter(file),
23+
}
24+
}
25+
26+
// Write writes the given bytes to the file.
27+
func (f *bufferedFile) Write(b []byte) (int, error) {
28+
f.mut.Lock()
29+
defer f.mut.Unlock()
30+
return f.wr.Write(b)
31+
}
32+
33+
// Flush flushes the writer to the file.
34+
func (f *bufferedFile) Flush() error {
35+
f.mut.Lock()
36+
defer f.mut.Unlock()
37+
return f.wr.Flush()
38+
}
39+
40+
// File returns a new copy of *os.File.
41+
func (f *bufferedFile) File() (*os.File, error) {
42+
err := f.Flush()
43+
if err != nil {
44+
return nil, err
45+
}
46+
f.mut.RLock()
47+
defer f.mut.RUnlock()
48+
return os.Open(f.file.Name())
49+
}
50+
51+
// Close closes the file.
52+
func (f *bufferedFile) Close() error {
53+
f.mut.Lock()
54+
defer f.mut.Unlock()
55+
return f.file.Close()
56+
}

‎pkg/trace/client.go

-155
This file was deleted.

‎pkg/trace/decoder.go

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package trace
2+
3+
import (
4+
"bufio"
5+
"encoding/json"
6+
"io"
7+
"os"
8+
)
9+
10+
// DecodeFile reads a file and decodes it into a slice of events via
11+
// scanning. The table parameter is used to determine the type of the events.
12+
// The file should be a jsonl file. The generic here are passed to the event
13+
// type.
14+
func DecodeFile[T any](f *os.File) ([]Event[T], error) {
15+
var out []Event[T]
16+
r := bufio.NewReader(f)
17+
for {
18+
line, err := r.ReadString('\n')
19+
if err == io.EOF {
20+
break
21+
} else if err != nil {
22+
return nil, err
23+
}
24+
25+
var e Event[T]
26+
if err := json.Unmarshal([]byte(line), &e); err != nil {
27+
return nil, err
28+
}
29+
30+
out = append(out, e)
31+
}
32+
33+
return out, nil
34+
}

‎pkg/trace/doc.go

+1-100
Original file line numberDiff line numberDiff line change
@@ -1,101 +1,2 @@
1-
/*
2-
# trace: push arbitrary trace level data to an influxdb instance
3-
4-
This package has code to create a client that can be used to push events to an
5-
influxdb instance. It is used to collect trace data from many different nodes in
6-
a network. If there is no URL in the config.toml, then the underlying client is
7-
nil and no points will be written. The provided chainID and nodeID are used to
8-
tag all points. The underlying client is exposed to allow for custom writes, but
9-
the WritePoint method should be used for most cases, as it enforces the schema.
10-
11-
## Usage and Schema
12-
13-
To use this package, first create a new client using the `NewClient` function,
14-
then pass that client to the relevant components that need to push events. After
15-
that, you can use the `WritePoint` method to push events to influxdb. In the below
16-
example, we're pushing a point in the consensus reactor to measure exactly when
17-
each step of consensus is reached for each node.
18-
19-
```go
20-
21-
if cs.traceClient.IsCollecting() {
22-
cs.traceClient.WritePoint("consensus", map[string]interface{}{
23-
"roundData": []interface{}{rs.Height, rs.Round, rs.Step},
24-
})
25-
}
26-
27-
```
28-
29-
Using this method enforces the typical schema, where we are tagging (aka
30-
indexing) each point by the chain-id and the node-id, then adding the local time
31-
of the creation of the event. If you need to push a custom point, you can use
32-
the underlying client directly. See influxdb2.WriteAPI for more details.
33-
34-
### Schema
35-
36-
All points in influxdb are divided into a key value pair per field. These kvs
37-
are indexed first by a "measurement", which is used as a "table" in other dbs.
38-
Additional indexes can also be added, we're using the chain-id and node-id here.
39-
This allows for us to quickly query for trace data for a specific chain and/or
40-
node.
41-
42-
```flux
43-
from(bucket: "e2e")
44-
45-
|> range(start: -1h)
46-
|> filter(
47-
fn: (r) => r["_measurement"] == "consensus"
48-
and r.chain_id == "ci-YREG8X"
49-
and r.node_id == "0b529c309608172a29c49979394734260b42acfb"
50-
)
51-
52-
```
53-
54-
### Running a node with remote tracing on
55-
56-
Tracing will only occur if an influxdb URL in specified either directly in the
57-
`config.toml` or as flags provided to the start sub command.
58-
59-
configure in the config.toml
60-
61-
```toml
62-
#######################################################
63-
### Instrumentation Configuration Options ###
64-
#######################################################
65-
[instrumentation]
66-
67-
...
68-
69-
# The URL of the influxdb instance to use for remote event
70-
# collection. If empty, remote event collection is disabled.
71-
influx_url = "http://your-influx-ip:8086/"
72-
73-
# The influxdb token to use for remote event collection.
74-
influx_token = "your-token"
75-
76-
# The influxdb bucket to use for remote event collection.
77-
influx_bucket = "e2e"
78-
79-
# The influxdb org to use for event remote collection.
80-
influx_org = "celestia"
81-
82-
# The size of the batches that are sent to the database.
83-
influx_batch_size = 20
84-
```
85-
86-
or
87-
```sh
88-
celestia-appd start --influxdb-url=http://your-influx-ip:8086/ --influxdb-token="your-token"
89-
```
90-
91-
### e2e tests
92-
93-
To push events from e2e tests, we only need to specify the URL and the token via
94-
the cli.
95-
96-
```bash
97-
cd test/e2e
98-
make && ./build/runner -f ./networks/ci.toml --influxdb-url=http://your-influx-ip:8086/ --influxdb-token="your-token"
99-
```
100-
*/
1+
/**/
1012
package trace

‎pkg/trace/fileserver.go

+317
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
package trace
2+
3+
import (
4+
"bufio"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"io"
9+
"mime"
10+
"mime/multipart"
11+
"net/http"
12+
"net/url"
13+
"os"
14+
"path"
15+
"path/filepath"
16+
"strings"
17+
"time"
18+
19+
"github.com/aws/aws-sdk-go/aws"
20+
"github.com/aws/aws-sdk-go/aws/credentials"
21+
"github.com/aws/aws-sdk-go/aws/session"
22+
"github.com/aws/aws-sdk-go/service/s3"
23+
)
24+
25+
func (lt *LocalTracer) getTableHandler() http.HandlerFunc {
26+
return func(w http.ResponseWriter, r *http.Request) {
27+
// Parse the request to get the data
28+
if err := r.ParseForm(); err != nil {
29+
http.Error(w, "Failed to parse form", http.StatusBadRequest)
30+
return
31+
}
32+
33+
inputString := r.FormValue("table")
34+
if inputString == "" {
35+
http.Error(w, "No data provided", http.StatusBadRequest)
36+
return
37+
}
38+
39+
f, err := lt.ReadTable(inputString)
40+
if err != nil {
41+
http.Error(w, fmt.Sprintf("failed to read table: %v", err), http.StatusInternalServerError)
42+
return
43+
}
44+
45+
// Use the pump function to continuously read from the file and write to
46+
// the response writer
47+
reader, writer := pump(inputString, bufio.NewReader(f))
48+
defer reader.Close()
49+
50+
// Set the content type to the writer's form data content type
51+
w.Header().Set("Content-Type", writer.FormDataContentType())
52+
53+
// Copy the data from the reader to the response writer
54+
if _, err := io.Copy(w, reader); err != nil {
55+
http.Error(w, "Failed to send data", http.StatusInternalServerError)
56+
return
57+
}
58+
}
59+
}
60+
61+
// pump continuously reads from a bufio.Reader and writes to a multipart.Writer.
62+
// It returns the reader end of the pipe and the writer for consumption by the
63+
// server.
64+
func pump(table string, br *bufio.Reader) (*io.PipeReader, *multipart.Writer) {
65+
r, w := io.Pipe()
66+
m := multipart.NewWriter(w)
67+
68+
go func(
69+
table string,
70+
m *multipart.Writer,
71+
w *io.PipeWriter,
72+
br *bufio.Reader,
73+
) {
74+
defer w.Close()
75+
defer m.Close()
76+
77+
part, err := m.CreateFormFile("filename", table+".jsonl")
78+
if err != nil {
79+
return
80+
}
81+
82+
if _, err = io.Copy(part, br); err != nil {
83+
return
84+
}
85+
86+
}(table, m, w, br)
87+
88+
return r, m
89+
}
90+
91+
func (lt *LocalTracer) servePullData() {
92+
mux := http.NewServeMux()
93+
mux.HandleFunc("/get_table", lt.getTableHandler())
94+
err := http.ListenAndServe(lt.cfg.Instrumentation.TracePullAddress, mux) //nolint:gosec
95+
if err != nil {
96+
lt.logger.Error("trace pull server failure", "err", err)
97+
}
98+
}
99+
100+
// GetTable downloads a table from the server and saves it to the given directory. It uses a multipart
101+
// response to download the file.
102+
func GetTable(serverURL, table, dirPath string) error {
103+
data := url.Values{}
104+
data.Set("table", table)
105+
106+
serverURL = serverURL + "/get_table"
107+
108+
resp, err := http.PostForm(serverURL, data) //nolint:gosec
109+
if err != nil {
110+
return err
111+
}
112+
defer resp.Body.Close()
113+
114+
if resp.StatusCode != http.StatusOK {
115+
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
116+
}
117+
118+
_, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
119+
if err != nil {
120+
return err
121+
}
122+
123+
boundary, ok := params["boundary"]
124+
if !ok {
125+
panic("Not a multipart response")
126+
}
127+
128+
err = os.MkdirAll(dirPath, 0755)
129+
if err != nil {
130+
return err
131+
}
132+
133+
outputFile, err := os.Create(path.Join(dirPath, table+".jsonl"))
134+
if err != nil {
135+
return err
136+
}
137+
defer outputFile.Close()
138+
139+
reader := multipart.NewReader(resp.Body, boundary)
140+
141+
for {
142+
part, err := reader.NextPart()
143+
if err == io.EOF {
144+
break // End of multipart
145+
}
146+
if err != nil {
147+
return err
148+
}
149+
150+
contentDisposition, params, err := mime.ParseMediaType(part.Header.Get("Content-Disposition"))
151+
if err != nil {
152+
return err
153+
}
154+
155+
if contentDisposition == "form-data" && params["filename"] != "" {
156+
_, err = io.Copy(outputFile, part)
157+
if err != nil {
158+
return err
159+
}
160+
}
161+
162+
part.Close()
163+
}
164+
165+
return nil
166+
}
167+
168+
// S3Config is a struct that holds the configuration for an S3 bucket.
169+
type S3Config struct {
170+
BucketName string `json:"bucket_name"`
171+
Region string `json:"region"`
172+
AccessKey string `json:"access_key"`
173+
SecretKey string `json:"secret_key"`
174+
// PushDelay is the time in seconds to wait before pushing the file to S3.
175+
// If this is 0, it defaults is used.
176+
PushDelay int64 `json:"push_delay"`
177+
}
178+
179+
// readS3Config reads an S3Config from a file in the given directory.
180+
func readS3Config(dir string) (S3Config, error) {
181+
cfg := S3Config{}
182+
f, err := os.Open(filepath.Join(dir, "s3.json"))
183+
if errors.Is(err, os.ErrNotExist) {
184+
return cfg, nil
185+
}
186+
if err != nil {
187+
return cfg, err
188+
}
189+
defer f.Close()
190+
err = json.NewDecoder(f).Decode(&cfg)
191+
if cfg.PushDelay == 0 {
192+
cfg.PushDelay = 60
193+
}
194+
return cfg, err
195+
}
196+
197+
// PushS3 pushes a file to an S3 bucket using the given S3Config. It uses the
198+
// chainID and the nodeID to organize the files in the bucket. The directory
199+
// structure is chainID/nodeID/table.jsonl .
200+
func PushS3(chainID, nodeID string, s3cfg S3Config, f *os.File) error {
201+
sess, err := session.NewSession(&aws.Config{
202+
Region: aws.String(s3cfg.Region),
203+
Credentials: credentials.NewStaticCredentials(
204+
s3cfg.AccessKey,
205+
s3cfg.SecretKey,
206+
"",
207+
),
208+
},
209+
)
210+
if err != nil {
211+
return err
212+
}
213+
214+
s3Svc := s3.New(sess)
215+
216+
key := fmt.Sprintf("%s/%s/%s", chainID, nodeID, filepath.Base(f.Name()))
217+
218+
_, err = s3Svc.PutObject(&s3.PutObjectInput{
219+
Bucket: aws.String(s3cfg.BucketName),
220+
Key: aws.String(key),
221+
Body: f,
222+
})
223+
224+
return err
225+
}
226+
227+
func (lt *LocalTracer) pushLoop() {
228+
for {
229+
time.Sleep(time.Second * time.Duration(lt.s3Config.PushDelay))
230+
err := lt.PushAll()
231+
if err != nil {
232+
lt.logger.Error("failed to push tables", "error", err)
233+
}
234+
}
235+
}
236+
237+
func (lt *LocalTracer) PushAll() error {
238+
for table := range lt.fileMap {
239+
f, err := lt.ReadTable(table)
240+
if err != nil {
241+
return err
242+
}
243+
if err := PushS3(lt.chainID, lt.nodeID, lt.s3Config, f); err != nil {
244+
return err
245+
}
246+
f.Close()
247+
}
248+
return nil
249+
}
250+
251+
// S3Download downloads files that match some prefix from an S3 bucket to a
252+
// local directory dst.
253+
func S3Download(dst, prefix string, cfg S3Config) error {
254+
// Ensure local directory structure exists
255+
err := os.MkdirAll(dst, os.ModePerm)
256+
if err != nil {
257+
return err
258+
}
259+
260+
sess, err := session.NewSession(&aws.Config{
261+
Region: aws.String(cfg.Region),
262+
Credentials: credentials.NewStaticCredentials(
263+
cfg.AccessKey,
264+
cfg.SecretKey,
265+
"",
266+
),
267+
},
268+
)
269+
if err != nil {
270+
return err
271+
}
272+
273+
s3Svc := s3.New(sess)
274+
input := &s3.ListObjectsV2Input{
275+
Bucket: aws.String(cfg.BucketName),
276+
Prefix: aws.String(prefix),
277+
Delimiter: aws.String(""),
278+
}
279+
280+
err = s3Svc.ListObjectsV2Pages(input, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
281+
for _, content := range page.Contents {
282+
localFilePath := filepath.Join(dst, strings.TrimPrefix(*content.Key, prefix))
283+
fmt.Printf("Downloading %s to %s\n", *content.Key, localFilePath)
284+
285+
// Create the directories in the path
286+
if err := os.MkdirAll(filepath.Dir(localFilePath), os.ModePerm); err != nil {
287+
return false
288+
}
289+
290+
// Create a file to write the S3 Object contents to.
291+
f, err := os.Create(localFilePath)
292+
if err != nil {
293+
return false
294+
}
295+
296+
resp, err := s3Svc.GetObject(&s3.GetObjectInput{
297+
Bucket: aws.String(cfg.BucketName),
298+
Key: aws.String(*content.Key),
299+
})
300+
if err != nil {
301+
f.Close()
302+
continue
303+
}
304+
defer resp.Body.Close()
305+
306+
// Copy the contents of the S3 object to the local file
307+
if _, err := io.Copy(f, resp.Body); err != nil {
308+
return false
309+
}
310+
311+
fmt.Printf("Successfully downloaded %s to %s\n", *content.Key, localFilePath)
312+
f.Close()
313+
}
314+
return !lastPage // continue paging
315+
})
316+
return err
317+
}

‎pkg/trace/flags.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package trace
22

33
const (
4-
FlagInfluxDBURL = "influxdb-url"
5-
FlagInfluxDBToken = "influxdb-token"
6-
FlagInfluxDBURLDescription = "URL of the InfluxDB instance to use for arbitrary data collection. If not specified, data will not be collected"
7-
FlagInfluxDBTokenDescription = "Token to use when writing to the InfluxDB instance. Must be specified if 'influxdb-url' is specified" //nolint:gosec
4+
FlagTracePushConfig = "trace-push-url"
5+
FlagTracePullAddress = "trace-pull-address"
6+
FlagTracePushConfigDescription = "URL of the trace push server"
7+
FlagTracePullAddressDescription = "address to listen on for pulling trace data"
88

99
FlagPyroscopeURL = "pyroscope-url"
1010
FlagPyroscopeURLDescription = "URL of the Pyroscope instance to use for continuous profiling. If not specified, profiling will not be enabled"

‎pkg/trace/local_tracer.go

+203
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
package trace
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"os"
7+
"path"
8+
"strings"
9+
"time"
10+
11+
"github.com/tendermint/tendermint/config"
12+
"github.com/tendermint/tendermint/libs/log"
13+
)
14+
15+
// Event wraps some trace data with metadata that dictates the table and things
16+
// like the chainID and nodeID.
17+
type Event[T any] struct {
18+
ChainID string `json:"chain_id"`
19+
NodeID string `json:"node_id"`
20+
Table string `json:"table"`
21+
Timestamp time.Time `json:"timestamp"`
22+
Msg T `json:"msg"`
23+
}
24+
25+
// NewEvent creates a new Event with the given chainID, nodeID, table, and msg.
26+
// It adds the current time as the timestamp.
27+
func NewEvent[T any](chainID, nodeID, table string, msg T) Event[T] {
28+
return Event[T]{
29+
ChainID: chainID,
30+
NodeID: nodeID,
31+
Table: table,
32+
Msg: msg,
33+
Timestamp: time.Now(),
34+
}
35+
}
36+
37+
// LocalTracer saves all of the events passed to the retuen channel to files
38+
// based on their "type" (a string field in the event). Each type gets its own
39+
// file. The internals are purposefully not *explicitly* thread safe to avoid the
40+
// overhead of locking with each event save. Only pass events to the returned
41+
// channel. Call CloseAll to close all open files.
42+
type LocalTracer struct {
43+
chainID, nodeID string
44+
logger log.Logger
45+
cfg *config.Config
46+
s3Config S3Config
47+
48+
// fileMap maps tables to their open files files are threadsafe, but the map
49+
// is not. Therefore don't create new files after initialization to remain
50+
// threadsafe.
51+
fileMap map[string]*bufferedFile
52+
// canal is a channel for all events that are being written. It acts as an
53+
// extra buffer to avoid blocking the caller when writing to files.
54+
canal chan Event[Entry]
55+
}
56+
57+
// NewLocalTracer creates a struct that will save all of the events passed to
58+
// the retuen channel to files based on their "table" (a string field in the
59+
// event). Each type gets its own file. The internal are purposefully not thread
60+
// safe to avoid the overhead of locking with each event save. Only pass events
61+
// to the returned channel. Call CloseAll to close all open files. Goroutine to
62+
// save events is started in this function.
63+
func NewLocalTracer(cfg *config.Config, logger log.Logger, chainID, nodeID string) (*LocalTracer, error) {
64+
fm := make(map[string]*bufferedFile)
65+
p := path.Join(cfg.RootDir, "data", "traces")
66+
for _, table := range splitAndTrimEmpty(cfg.Instrumentation.TracingTables, ",", " ") {
67+
fileName := fmt.Sprintf("%s/%s.jsonl", p, table)
68+
err := os.MkdirAll(p, 0700)
69+
if err != nil {
70+
return nil, fmt.Errorf("failed to create directory %s: %w", p, err)
71+
}
72+
file, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
73+
if err != nil {
74+
return nil, fmt.Errorf("failed to open or create file %s: %w", fileName, err)
75+
}
76+
bf := newbufferedFile(file)
77+
fm[table] = bf
78+
}
79+
80+
lt := &LocalTracer{
81+
fileMap: fm,
82+
cfg: cfg,
83+
canal: make(chan Event[Entry], cfg.Instrumentation.TraceBufferSize),
84+
chainID: chainID,
85+
nodeID: nodeID,
86+
logger: logger,
87+
}
88+
89+
go lt.drainCanal()
90+
if cfg.Instrumentation.TracePullAddress != "" {
91+
go lt.servePullData()
92+
}
93+
if cfg.Instrumentation.TracePushConfig != "" {
94+
s3Config, err := readS3Config(path.Join(cfg.RootDir, "config", cfg.Instrumentation.TracePushConfig))
95+
if err != nil {
96+
return nil, fmt.Errorf("failed to read s3 config: %w", err)
97+
}
98+
lt.s3Config = s3Config
99+
go lt.pushLoop()
100+
}
101+
102+
return lt, nil
103+
}
104+
105+
func (lt *LocalTracer) Write(e Entry) {
106+
if !lt.IsCollecting(e.Table()) {
107+
return
108+
}
109+
lt.canal <- NewEvent(lt.chainID, lt.nodeID, e.Table(), e)
110+
}
111+
112+
// ReadTable returns a file for the given table. If the table is not being
113+
// collected, an error is returned. This method is not thread-safe.
114+
func (lt *LocalTracer) ReadTable(table string) (*os.File, error) {
115+
bf, has := lt.getFile(table)
116+
if !has {
117+
return nil, fmt.Errorf("table %s not found", table)
118+
}
119+
120+
return bf.File()
121+
}
122+
123+
func (lt *LocalTracer) IsCollecting(table string) bool {
124+
if _, has := lt.getFile(table); has {
125+
return true
126+
}
127+
return false
128+
}
129+
130+
// getFile gets a file for the given type. This method is purposely
131+
// not thread-safe to avoid the overhead of locking with each event save.
132+
func (lt *LocalTracer) getFile(table string) (*bufferedFile, bool) {
133+
f, has := lt.fileMap[table]
134+
return f, has
135+
}
136+
137+
// saveEventToFile marshals an Event into JSON and appends it to a file named after the event's Type.
138+
func (lt *LocalTracer) saveEventToFile(event Event[Entry]) error {
139+
file, has := lt.getFile(event.Table)
140+
if !has {
141+
return fmt.Errorf("table %s not found", event.Table)
142+
}
143+
144+
eventJSON, err := json.Marshal(event)
145+
if err != nil {
146+
return fmt.Errorf("failed to marshal event: %v", err)
147+
}
148+
149+
if _, err := file.Write(append(eventJSON, '\n')); err != nil {
150+
return fmt.Errorf("failed to write event to file: %v", err)
151+
}
152+
153+
return nil
154+
}
155+
156+
// draincanal takes a variadic number of channels of Event pointers and drains them into files.
157+
func (lt *LocalTracer) drainCanal() {
158+
// purposefully do not lock, and rely on the channel to provide sync
159+
// actions, to avoid overhead of locking with each event save.
160+
for ev := range lt.canal {
161+
if err := lt.saveEventToFile(ev); err != nil {
162+
lt.logger.Error("failed to save event to file", "error", err)
163+
}
164+
}
165+
}
166+
167+
// Stop optionally uploads and closes all open files.
168+
func (lt *LocalTracer) Stop() {
169+
for _, file := range lt.fileMap {
170+
err := file.Flush()
171+
if err != nil {
172+
lt.logger.Error("failed to flush file", "error", err)
173+
}
174+
err = file.Close()
175+
if err != nil {
176+
lt.logger.Error("failed to close file", "error", err)
177+
}
178+
}
179+
}
180+
181+
// splitAndTrimEmpty slices s into all subslices separated by sep and returns a
182+
// slice of the string s with all leading and trailing Unicode code points
183+
// contained in cutset removed. If sep is empty, SplitAndTrim splits after each
184+
// UTF-8 sequence. First part is equivalent to strings.SplitN with a count of
185+
// -1. also filter out empty strings, only return non-empty strings.
186+
//
187+
// NOTE: this is copy pasted from the config package to avoid a circular
188+
// dependency. See the function of the same name for tests.
189+
func splitAndTrimEmpty(s, sep, cutset string) []string {
190+
if s == "" {
191+
return []string{}
192+
}
193+
194+
spl := strings.Split(s, sep)
195+
nonEmptyStrings := make([]string, 0, len(spl))
196+
for i := 0; i < len(spl); i++ {
197+
element := strings.Trim(spl[i], cutset)
198+
if element != "" {
199+
nonEmptyStrings = append(nonEmptyStrings, element)
200+
}
201+
}
202+
return nonEmptyStrings
203+
}

‎pkg/trace/local_tracer_test.go

+153
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package trace
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"net"
7+
"os"
8+
"path"
9+
"testing"
10+
"time"
11+
12+
"github.com/stretchr/testify/require"
13+
"github.com/tendermint/tendermint/config"
14+
"github.com/tendermint/tendermint/libs/log"
15+
)
16+
17+
const (
18+
// testEventTable is the table name for the testEvent struct.
19+
testEventTable = "testEvent"
20+
)
21+
22+
type testEvent struct {
23+
City string `json:"city"`
24+
Length int `json:"length"`
25+
}
26+
27+
func (c testEvent) Table() string {
28+
return testEventTable
29+
}
30+
31+
// TestLocalTracerReadWrite tests the local client by writing some events,
32+
// reading them back and comparing them, writing at the same time as reading.
33+
func TestLocalTracerReadWrite(t *testing.T) {
34+
port, err := getFreePort()
35+
require.NoError(t, err)
36+
client := setupLocalTracer(t, port)
37+
38+
annecy := testEvent{"Annecy", 420}
39+
paris := testEvent{"Paris", 420}
40+
client.Write(annecy)
41+
client.Write(paris)
42+
43+
time.Sleep(100 * time.Millisecond)
44+
45+
f, err := client.ReadTable(testEventTable)
46+
require.NoError(t, err)
47+
48+
// write at the same time as reading to test thread safety this test will be
49+
// flakey if this is not being handled correctly
50+
migenees := testEvent{"Migennes", 620}
51+
pontivy := testEvent{"Pontivy", 720}
52+
client.Write(migenees)
53+
client.Write(pontivy)
54+
55+
events, err := DecodeFile[testEvent](f)
56+
require.NoError(t, err)
57+
require.GreaterOrEqual(t, len(events), 2)
58+
require.Equal(t, annecy, events[0].Msg)
59+
require.Equal(t, paris, events[1].Msg)
60+
f.Close()
61+
62+
time.Sleep(100 * time.Millisecond)
63+
64+
f, err = client.ReadTable(testEventTable)
65+
require.NoError(t, err)
66+
defer f.Close()
67+
events, err = DecodeFile[testEvent](f)
68+
require.NoError(t, err)
69+
require.Len(t, events, 4)
70+
require.Equal(t, migenees, events[2].Msg)
71+
require.Equal(t, pontivy, events[3].Msg)
72+
}
73+
74+
// TestLocalTracerServerPull tests the pull portion of the server.
75+
func TestLocalTracerServerPull(t *testing.T) {
76+
port, err := getFreePort()
77+
require.NoError(t, err)
78+
client := setupLocalTracer(t, port)
79+
80+
for i := 0; i < 5; i++ {
81+
client.Write(testEvent{"Annecy", i})
82+
}
83+
84+
// Wait for the server to start
85+
time.Sleep(100 * time.Millisecond)
86+
87+
// Test the server
88+
newDir := t.TempDir()
89+
90+
url := fmt.Sprintf("http://localhost:%d", port)
91+
92+
// try to read a table that is not being collected. error expected.
93+
err = GetTable(url, "canal", newDir)
94+
require.Error(t, err)
95+
96+
err = GetTable(url, testEventTable, newDir)
97+
require.NoError(t, err)
98+
99+
originalFile, err := client.ReadTable(testEventTable)
100+
require.NoError(t, err)
101+
defer originalFile.Close()
102+
originalBz, err := io.ReadAll(originalFile)
103+
require.NoError(t, err)
104+
105+
path := path.Join(newDir, testEventTable+".jsonl")
106+
downloadedFile, err := os.Open(path)
107+
require.NoError(t, err)
108+
defer downloadedFile.Close()
109+
110+
downloadedBz, err := io.ReadAll(downloadedFile)
111+
require.NoError(t, err)
112+
require.Equal(t, originalBz, downloadedBz)
113+
114+
_, err = downloadedFile.Seek(0, 0) // reset the seek on the file to read it again
115+
require.NoError(t, err)
116+
events, err := DecodeFile[testEvent](downloadedFile)
117+
require.NoError(t, err)
118+
require.Len(t, events, 5)
119+
for i := 0; i < 5; i++ {
120+
require.Equal(t, i, events[i].Msg.Length)
121+
}
122+
}
123+
124+
func setupLocalTracer(t *testing.T, port int) *LocalTracer {
125+
logger := log.NewNopLogger()
126+
cfg := config.DefaultConfig()
127+
cfg.SetRoot(t.TempDir())
128+
cfg.Instrumentation.TraceBufferSize = 100
129+
cfg.Instrumentation.TracingTables = testEventTable
130+
cfg.Instrumentation.TracePullAddress = fmt.Sprintf(":%d", port)
131+
132+
client, err := NewLocalTracer(cfg, logger, "test_chain", "test_node")
133+
if err != nil {
134+
t.Fatalf("failed to create local client: %v", err)
135+
}
136+
137+
return client
138+
}
139+
140+
// getFreePort returns a free port and optionally an error.
141+
func getFreePort() (int, error) {
142+
a, err := net.ResolveTCPAddr("tcp", "localhost:0")
143+
if err != nil {
144+
return 0, err
145+
}
146+
147+
l, err := net.ListenTCP("tcp", a)
148+
if err != nil {
149+
return 0, err
150+
}
151+
defer l.Close()
152+
return l.Addr().(*net.TCPAddr).Port, nil
153+
}

‎pkg/trace/schema/consensus.go

+102-116
Original file line numberDiff line numberDiff line change
@@ -21,162 +21,148 @@ func ConsensusTables() []string {
2121
// Schema constants for the consensus round state tracing database.
2222
const (
2323
// RoundStateTable is the name of the table that stores the consensus
24-
// state traces. Follows this schema:
25-
//
26-
// | time | height | round | step |
24+
// state traces.
2725
RoundStateTable = "consensus_round_state"
28-
29-
// StepFieldKey is the name of the field that stores the consensus step. The
30-
// value is a string.
31-
StepFieldKey = "step"
3226
)
3327

28+
// RoundState describes schema for the "consensus_round_state" table.
29+
type RoundState struct {
30+
Height int64 `json:"height"`
31+
Round int32 `json:"round"`
32+
Step cstypes.RoundStepType `json:"step"`
33+
}
34+
35+
// Table returns the table name for the RoundState struct.
36+
func (r RoundState) Table() string {
37+
return RoundStateTable
38+
}
39+
3440
// WriteRoundState writes a tracing point for a tx using the predetermined
35-
// schema for consensus state tracing. This is used to create a table in the following
36-
// schema:
37-
//
38-
// | time | height | round | step |
39-
func WriteRoundState(client *trace.Client, height int64, round int32, step cstypes.RoundStepType) {
40-
client.WritePoint(RoundStateTable, map[string]interface{}{
41-
HeightFieldKey: height,
42-
RoundFieldKey: round,
43-
StepFieldKey: step.String(),
44-
})
41+
// schema for consensus state tracing.
42+
func WriteRoundState(client trace.Tracer, height int64, round int32, step cstypes.RoundStepType) {
43+
client.Write(RoundState{Height: height, Round: round, Step: step})
4544
}
4645

4746
// Schema constants for the "consensus_block_parts" table.
4847
const (
4948
// BlockPartsTable is the name of the table that stores the consensus block
5049
// parts.
51-
// following schema:
52-
//
53-
// | time | height | round | index | peer | transfer type |
5450
BlockPartsTable = "consensus_block_parts"
55-
56-
// BlockPartIndexFieldKey is the name of the field that stores the block
57-
// part
58-
BlockPartIndexFieldKey = "index"
5951
)
6052

53+
// BlockPart describes schema for the "consensus_block_parts" table.
54+
type BlockPart struct {
55+
Height int64 `json:"height"`
56+
Round int32 `json:"round"`
57+
Index int32 `json:"index"`
58+
Peer p2p.ID `json:"peer"`
59+
TransferType TransferType `json:"transfer_type"`
60+
}
61+
62+
// Table returns the table name for the BlockPart struct.
63+
func (b BlockPart) Table() string {
64+
return BlockPartsTable
65+
}
66+
6167
// WriteBlockPart writes a tracing point for a BlockPart using the predetermined
62-
// schema for consensus state tracing. This is used to create a table in the
63-
// following schema:
64-
//
65-
// | time | height | round | index | peer | transfer type |
68+
// schema for consensus state tracing.
6669
func WriteBlockPart(
67-
client *trace.Client,
70+
client trace.Tracer,
6871
height int64,
6972
round int32,
7073
peer p2p.ID,
7174
index uint32,
72-
transferType string,
75+
transferType TransferType,
7376
) {
7477
// this check is redundant to what is checked during WritePoint, although it
7578
// is an optimization to avoid allocations from the map of fields.
7679
if !client.IsCollecting(BlockPartsTable) {
7780
return
7881
}
79-
client.WritePoint(BlockPartsTable, map[string]interface{}{
80-
HeightFieldKey: height,
81-
RoundFieldKey: round,
82-
BlockPartIndexFieldKey: index,
83-
PeerFieldKey: peer,
84-
TransferTypeFieldKey: transferType,
85-
})
86-
}
87-
88-
const (
89-
// BlockTable is the name of the table that stores metadata about consensus blocks.
90-
// following schema:
91-
//
92-
// | time | height | timestamp |
93-
BlockTable = "consensus_block"
94-
95-
// UnixMillisecondTimestampFieldKey is the name of the field that stores the timestamp in
96-
// the last commit in unix milliseconds.
97-
UnixMillisecondTimestampFieldKey = "unix_millisecond_timestamp"
98-
99-
// TxCountFieldKey is the name of the field that stores the number of
100-
// transactions in the block.
101-
TxCountFieldKey = "tx_count"
102-
103-
// SquareSizeFieldKey is the name of the field that stores the square size
104-
// of the block. SquareSize is the number of shares in a single row or
105-
// column of the origianl data square.
106-
SquareSizeFieldKey = "square_size"
107-
108-
// BlockSizeFieldKey is the name of the field that stores the size of
109-
// the block data in bytes.
110-
BlockSizeFieldKey = "block_size"
111-
112-
// ProposerFieldKey is the name of the field that stores the proposer of
113-
// the block.
114-
ProposerFieldKey = "proposer"
115-
116-
// LastCommitRoundFieldKey is the name of the field that stores the round
117-
// of the last commit.
118-
LastCommitRoundFieldKey = "last_commit_round"
119-
)
120-
121-
func WriteBlock(client *trace.Client, block *types.Block, size int) {
122-
client.WritePoint(BlockTable, map[string]interface{}{
123-
HeightFieldKey: block.Height,
124-
UnixMillisecondTimestampFieldKey: block.Time.UnixMilli(),
125-
TxCountFieldKey: len(block.Data.Txs),
126-
SquareSizeFieldKey: block.SquareSize,
127-
BlockSizeFieldKey: size,
128-
ProposerFieldKey: block.ProposerAddress.String(),
129-
LastCommitRoundFieldKey: block.LastCommit.Round,
82+
client.Write(BlockPart{
83+
Height: height,
84+
Round: round,
85+
Index: int32(index),
86+
Peer: peer,
87+
TransferType: transferType,
13088
})
13189
}
13290

13391
// Schema constants for the consensus votes tracing database.
13492
const (
13593
// VoteTable is the name of the table that stores the consensus
136-
// voting traces. Follows this schema:
137-
//
138-
// | time | height | round | vote_type | vote_height | vote_round
139-
// | vote_block_id| vote_unix_millisecond_timestamp
140-
// | vote_validator_address | vote_validator_index | peer
141-
// | transfer_type |
94+
// voting traces.
14295
VoteTable = "consensus_vote"
143-
144-
VoteTypeFieldKey = "vote_type"
145-
VoteHeightFieldKey = "vote_height"
146-
VoteRoundFieldKey = "vote_round"
147-
VoteBlockIDFieldKey = "vote_block_id"
148-
VoteTimestampFieldKey = "vote_unix_millisecond_timestamp"
149-
ValidatorAddressFieldKey = "vote_validator_address"
150-
ValidatorIndexFieldKey = "vote_validator_index"
15196
)
15297

98+
// Vote describes schema for the "consensus_vote" table.
99+
type Vote struct {
100+
Height int64 `json:"height"`
101+
Round int32 `json:"round"`
102+
VoteType string `json:"vote_type"`
103+
VoteHeight int64 `json:"vote_height"`
104+
VoteRound int32 `json:"vote_round"`
105+
VoteMillisecondTimestamp int64 `json:"vote_unix_millisecond_timestamp"`
106+
ValidatorAddress string `json:"vote_validator_address"`
107+
Peer p2p.ID `json:"peer"`
108+
TransferType TransferType `json:"transfer_type"`
109+
}
110+
111+
func (v Vote) Table() string {
112+
return VoteTable
113+
}
114+
153115
// WriteVote writes a tracing point for a vote using the predetermined
154116
// schema for consensus vote tracing.
155-
// This is used to create a table in the following
156-
// schema:
157-
//
158-
// | time | height | round | vote_type | vote_height | vote_round
159-
// | vote_block_id| vote_unix_millisecond_timestamp
160-
// | vote_validator_address | vote_validator_index | peer
161-
// | transfer_type |
162-
func WriteVote(client *trace.Client,
117+
func WriteVote(client trace.Tracer,
163118
height int64, // height of the current peer when it received/sent the vote
164119
round int32, // round of the current peer when it received/sent the vote
165120
vote *types.Vote, // vote received by the current peer
166121
peer p2p.ID, // the peer from which it received the vote or the peer to which it sent the vote
167-
transferType string, // download (received) or upload(sent)
122+
transferType TransferType, // download (received) or upload(sent)
168123
) {
169-
client.WritePoint(VoteTable, map[string]interface{}{
170-
HeightFieldKey: height,
171-
RoundFieldKey: round,
172-
VoteTypeFieldKey: vote.Type.String(),
173-
VoteHeightFieldKey: vote.Height,
174-
VoteRoundFieldKey: vote.Round,
175-
VoteBlockIDFieldKey: vote.BlockID.Hash.String(),
176-
VoteTimestampFieldKey: vote.Timestamp.UnixMilli(),
177-
ValidatorAddressFieldKey: vote.ValidatorAddress.String(),
178-
ValidatorIndexFieldKey: vote.ValidatorIndex,
179-
PeerFieldKey: peer,
180-
TransferTypeFieldKey: transferType,
124+
client.Write(Vote{
125+
Height: height,
126+
Round: round,
127+
VoteType: vote.Type.String(),
128+
VoteHeight: vote.Height,
129+
VoteRound: vote.Round,
130+
VoteMillisecondTimestamp: vote.Timestamp.UnixMilli(),
131+
ValidatorAddress: vote.ValidatorAddress.String(),
132+
Peer: peer,
133+
TransferType: transferType,
134+
})
135+
}
136+
137+
const (
138+
// BlockTable is the name of the table that stores metadata about consensus blocks.
139+
BlockTable = "consensus_block"
140+
)
141+
142+
// BlockSummary describes schema for the "consensus_block" table.
143+
type BlockSummary struct {
144+
Height int64 `json:"height"`
145+
UnixMillisecondTimestamp int64 `json:"unix_millisecond_timestamp"`
146+
TxCount int `json:"tx_count"`
147+
SquareSize uint64 `json:"square_size"`
148+
BlockSize int `json:"block_size"`
149+
Proposer string `json:"proposer"`
150+
LastCommitRound int32 `json:"last_commit_round"`
151+
}
152+
153+
func (b BlockSummary) Table() string {
154+
return BlockTable
155+
}
156+
157+
// WriteBlockSummary writes a tracing point for a block using the predetermined
158+
func WriteBlockSummary(client trace.Tracer, block *types.Block, size int) {
159+
client.Write(BlockSummary{
160+
Height: block.Height,
161+
UnixMillisecondTimestamp: block.Time.UnixMilli(),
162+
TxCount: len(block.Data.Txs),
163+
SquareSize: block.SquareSize,
164+
BlockSize: size,
165+
Proposer: block.ProposerAddress.String(),
166+
LastCommitRound: block.LastCommit.Round,
181167
})
182168
}

‎pkg/trace/schema/mempool.go

+50-82
Original file line numberDiff line numberDiff line change
@@ -4,130 +4,98 @@ import (
44
"github.com/tendermint/tendermint/libs/bytes"
55
"github.com/tendermint/tendermint/p2p"
66
"github.com/tendermint/tendermint/pkg/trace"
7-
"github.com/tendermint/tendermint/types"
87
)
98

109
// MempoolTables returns the list of tables for mempool tracing.
1110
func MempoolTables() []string {
1211
return []string{
1312
MempoolTxTable,
1413
MempoolPeerStateTable,
15-
MempoolRejectedTable,
1614
}
1715
}
1816

1917
// Schema constants for the mempool_tx table
2018
const (
2119
// MempoolTxTable is the tracing "measurement" (aka table) for the mempool
2220
// that stores tracing data related to gossiping transactions.
23-
//
24-
// The schema for this table is:
25-
// | time | peerID | tx size | tx hash | transfer type | mempool version |
2621
MempoolTxTable = "mempool_tx"
22+
)
2723

28-
// TxFieldKey is the tracing field key for receiving for sending a
29-
// tx. This should take the form of a tx hash as the value.
30-
TxFieldKey = "tx"
31-
32-
// SizeFieldKey is the tracing field key for the size of a tx. This
33-
// should take the form of the size of the tx as the value.
34-
SizeFieldKey = "size"
35-
36-
// VersionFieldKey is the tracing field key for the version of the mempool.
37-
// This is used to distinguish between versions of the mempool.
38-
VersionFieldKey = "version"
39-
40-
// V1VersionFieldValue is a tracing field value for the version of
41-
// the mempool. This value is used by the "version" field key.
42-
V1VersionFieldValue = "v1"
24+
// MemPoolTx describes the schema for the "mempool_tx" table.
25+
type MempoolTx struct {
26+
TxHash string `json:"tx_hash"`
27+
Peer p2p.ID `json:"peer"`
28+
Size int `json:"size"`
29+
TransferType TransferType `json:"transfer_type"`
30+
}
4331

44-
// CatVersionFieldValue is a tracing field value for the version of
45-
// the mempool. This value is used by the "version" field key.
46-
CatVersionFieldValue = "cat"
47-
)
32+
// Table returns the table name for the MempoolTx struct.
33+
func (m MempoolTx) Table() string {
34+
return MempoolTxTable
35+
}
4836

4937
// WriteMempoolTx writes a tracing point for a tx using the predetermined
50-
// schema for mempool tracing. This is used to create a table in the following
51-
// schema:
52-
//
53-
// | time | peerID | tx size | tx hash | transfer type | mempool version |
54-
func WriteMempoolTx(client *trace.Client, peer p2p.ID, tx []byte, transferType, version string) {
38+
// schema for mempool tracing.
39+
func WriteMempoolTx(client trace.Tracer, peer p2p.ID, txHash []byte, transferType TransferType) {
5540
// this check is redundant to what is checked during WritePoint, although it
5641
// is an optimization to avoid allocations from the map of fields.
5742
if !client.IsCollecting(MempoolTxTable) {
5843
return
5944
}
60-
client.WritePoint(MempoolTxTable, map[string]interface{}{
61-
TxFieldKey: bytes.HexBytes(types.Tx(tx).Hash()).String(),
62-
PeerFieldKey: peer,
63-
SizeFieldKey: len(tx),
64-
TransferTypeFieldKey: transferType,
65-
VersionFieldKey: version,
45+
client.Write(MempoolTx{
46+
TxHash: bytes.HexBytes(txHash).String(),
47+
Peer: peer,
48+
Size: len(txHash),
49+
TransferType: transferType,
6650
})
6751
}
6852

6953
const (
7054
// MempoolPeerState is the tracing "measurement" (aka table) for the mempool
7155
// that stores tracing data related to mempool state, specifically
7256
// the gossipping of "SeenTx" and "WantTx".
73-
//
74-
// The schema for this table is:
75-
// | time | peerID | update type | mempool version |
7657
MempoolPeerStateTable = "mempool_peer_state"
58+
)
7759

78-
// StateUpdateFieldKey is the tracing field key for state updates of the mempool.
79-
StateUpdateFieldKey = "update"
80-
81-
// SeenTxStateUpdateFieldValue is a tracing field value for the state
82-
// update of the mempool. This value is used by the "update" field key.
83-
SeenTxStateUpdateFieldValue = "seen_tx"
84-
85-
// WantTxStateUpdateFieldValue is a tracing field value for the state
86-
// update of the mempool. This value is used by the "update" field key.
87-
WantTxStateUpdateFieldValue = "want_tx"
88-
89-
// RemovedTxStateUpdateFieldValue is a tracing field value for the local
90-
// state update of the mempool. This value is used by the "update" field
91-
// key.
92-
RemovedTxStateUpdateFieldValue = "removed_tx"
60+
type MempoolStateUpdateType string
9361

94-
// AddedTxStateUpdateFieldValue is a tracing field value for the local state
95-
// update of the mempool. This value is used by the "update" field key.
96-
AddedTxStateUpdateFieldValue = "added_tx"
62+
const (
63+
SeenTx MempoolStateUpdateType = "SeenTx"
64+
WantTx MempoolStateUpdateType = "WantTx"
65+
Unknown MempoolStateUpdateType = "Unknown"
9766
)
9867

99-
// WriteMempoolPeerState writes a tracing point for the mempool state using
100-
// the predetermined schema for mempool tracing. This is used to create a table
101-
// in the following schema:
102-
//
103-
// | time | peerID | transfer type | state update | mempool version |
104-
func WriteMempoolPeerState(client *trace.Client, peer p2p.ID, stateUpdate, transferType, version string) {
105-
// this check is redundant to what is checked during WritePoint, although it
106-
// is an optimization to avoid allocations from creating the map of fields.
107-
if !client.IsCollecting(MempoolPeerStateTable) {
108-
return
109-
}
110-
client.WritePoint(MempoolPeerStateTable, map[string]interface{}{
111-
PeerFieldKey: peer,
112-
TransferTypeFieldKey: transferType,
113-
StateUpdateFieldKey: stateUpdate,
114-
VersionFieldKey: version,
115-
})
68+
// MempoolPeerState describes the schema for the "mempool_peer_state" table.
69+
type MempoolPeerState struct {
70+
Peer p2p.ID `json:"peer"`
71+
StateUpdate MempoolStateUpdateType `json:"state_update"`
72+
TxHash string `json:"tx_hash"`
73+
TransferType TransferType `json:"transfer_type"`
11674
}
11775

118-
const (
119-
MempoolRejectedTable = "mempool_rejected"
120-
ReasonFieldKey = "reason"
121-
)
76+
// Table returns the table name for the MempoolPeerState struct.
77+
func (m MempoolPeerState) Table() string {
78+
return MempoolPeerStateTable
79+
}
12280

123-
// WriteMempoolRejected records why a transaction was rejected.
124-
func WriteMempoolRejected(client *trace.Client, reason string) {
81+
// WriteMempoolPeerState writes a tracing point for the mempool state using
82+
// the predetermined schema for mempool tracing.
83+
func WriteMempoolPeerState(
84+
client trace.Tracer,
85+
peer p2p.ID,
86+
stateUpdate MempoolStateUpdateType,
87+
txHash []byte,
88+
transferType TransferType,
89+
) {
12590
// this check is redundant to what is checked during WritePoint, although it
12691
// is an optimization to avoid allocations from creating the map of fields.
127-
if !client.IsCollecting(MempoolRejectedTable) {
92+
if !client.IsCollecting(MempoolPeerStateTable) {
12893
return
12994
}
130-
client.WritePoint(MempoolRejectedTable, map[string]interface{}{
131-
ReasonFieldKey: reason,
95+
client.Write(MempoolPeerState{
96+
Peer: peer,
97+
StateUpdate: stateUpdate,
98+
TransferType: transferType,
99+
TxHash: bytes.HexBytes(txHash).String(),
132100
})
133101
}

‎pkg/trace/schema/schema.go

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package schema
2+
3+
import (
4+
"strings"
5+
6+
"github.com/tendermint/tendermint/config"
7+
)
8+
9+
func init() {
10+
config.DefaultTracingTables = strings.Join(AllTables(), ",")
11+
}
12+
13+
func AllTables() []string {
14+
tables := []string{}
15+
tables = append(tables, MempoolTables()...)
16+
tables = append(tables, ConsensusTables()...)
17+
return tables
18+
}
19+
20+
type TransferType int
21+
22+
const (
23+
Download TransferType = iota
24+
Upload
25+
)
26+
27+
func (t TransferType) String() string {
28+
switch t {
29+
case Download:
30+
return "download"
31+
case Upload:
32+
return "upload"
33+
default:
34+
return "unknown"
35+
}
36+
}

‎pkg/trace/schema/schema_test.go

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package schema
2+
3+
// Define a test struct with various field types and json tags
4+
type TestStruct struct {
5+
Name string `json:"name"`
6+
Age int `json:"age"`
7+
Email string `json:"email"`
8+
}
9+
10+
// Mock for a custom type with String method
11+
type CustomType int
12+
13+
// TestStructWithCustomType includes a field with a custom type having a String method
14+
type TestStructWithCustomType struct {
15+
ID int `json:"id"`
16+
Type CustomType `json:"type"`
17+
}

‎pkg/trace/schema/tables.go

-42
This file was deleted.

‎pkg/trace/tracer.go

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package trace
2+
3+
import (
4+
"errors"
5+
"os"
6+
7+
"github.com/tendermint/tendermint/config"
8+
"github.com/tendermint/tendermint/libs/log"
9+
)
10+
11+
// Entry is an interface for all structs that are used to define the schema for
12+
// traces.
13+
type Entry interface {
14+
// Table defines which table the struct belongs to.
15+
Table() string
16+
}
17+
18+
// Tracer defines the methods for a client that can write and read trace data.
19+
type Tracer interface {
20+
Write(Entry)
21+
ReadTable(table string) (*os.File, error)
22+
IsCollecting(table string) bool
23+
Stop()
24+
}
25+
26+
func NewTracer(cfg *config.Config, logger log.Logger, chainID, nodeID string) (Tracer, error) {
27+
switch cfg.Instrumentation.TraceType {
28+
case "local":
29+
return NewLocalTracer(cfg, logger, chainID, nodeID)
30+
case "noop":
31+
return NoOpTracer(), nil
32+
default:
33+
logger.Error("unknown tracer type, using noop", "type", cfg.Instrumentation.TraceType)
34+
return NoOpTracer(), nil
35+
}
36+
}
37+
38+
func NoOpTracer() Tracer {
39+
return &noOpTracer{}
40+
}
41+
42+
type noOpTracer struct{}
43+
44+
func (n *noOpTracer) Write(_ Entry) {}
45+
func (n *noOpTracer) ReadTable(_ string) (*os.File, error) {
46+
return nil, errors.New("no-op tracer does not support reading")
47+
}
48+
func (n *noOpTracer) IsCollecting(_ string) bool { return false }
49+
func (n *noOpTracer) Stop() {}

‎test/e2e/node/main.go

+2
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,8 @@ func startNode(cfg *Config) error {
130130
return fmt.Errorf("failed to setup config: %w", err)
131131
}
132132

133+
cmtcfg.Instrumentation.TraceType = "local"
134+
133135
n, err := node.NewNode(cmtcfg,
134136
privval.LoadOrGenFilePV(cmtcfg.PrivValidatorKeyFile(), cmtcfg.PrivValidatorStateFile()),
135137
nodeKey,

‎test/e2e/pkg/infrastructure.go

+4-6
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,11 @@ type InfrastructureData struct {
3333
// IP addresses are expected to be within.
3434
Network string `json:"network"`
3535

36-
// InfluxDBURL is the URL of the InfluxDB instance to use for arbitrary data
37-
// collection. If not specified, data will not be collected.
38-
InfluxDBURL string `json:"influxdb_url,omitempty"`
36+
// TracePushConfig is the URL of the server to push trace data to.
37+
TracePushConfig string `json:"trace_push_config,omitempty"`
3938

40-
// InfluxDBToken is the token to use when writing to the InfluxDB instance.
41-
// Must be specified if 'influxdb-url' is specified.
42-
InfluxDBToken string `json:"influxdb_token,omitempty"`
39+
// TracePullAddress is the address to listen on for pulling trace data.
40+
TracePullAddress string `json:"trace_pull_address,omitempty"`
4341

4442
// PyroscopeURL is the URL of the pyroscope instance to use for continuous
4543
// profiling. If not specified, data will not be collected.

‎test/e2e/pkg/testnet.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ type Node struct {
106106
SendNoLoad bool
107107
Prometheus bool
108108
PrometheusProxyPort uint32
109-
InfluxDBURL string
110-
InfluxDBToken string
109+
TracePushConfig string
110+
TracePullAddress string
111111
PyroscopeURL string
112112
PyroscopeTrace bool
113113
PyroscopeProfileTypes []string
@@ -209,8 +209,8 @@ func LoadTestnet(manifest Manifest, fname string, ifd InfrastructureData) (*Test
209209
Perturbations: []Perturbation{},
210210
Misbehaviors: make(map[int64]string),
211211
SendNoLoad: nodeManifest.SendNoLoad,
212-
InfluxDBURL: ifd.InfluxDBURL,
213-
InfluxDBToken: ifd.InfluxDBToken,
212+
TracePushConfig: ifd.TracePushConfig,
213+
TracePullAddress: ifd.TracePullAddress,
214214
PyroscopeURL: ifd.PyroscopeURL,
215215
PyroscopeTrace: ifd.PyroscopeTrace,
216216
PyroscopeProfileTypes: ifd.PyroscopeProfileTypes,

‎test/e2e/runner/main.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -78,17 +78,17 @@ func NewCLI() *CLI {
7878
return fmt.Errorf("unknown infrastructure type '%s'", inft)
7979
}
8080

81-
iurl, err := cmd.Flags().GetString(trace.FlagInfluxDBURL)
81+
iurl, err := cmd.Flags().GetString(trace.FlagTracePushConfig)
8282
if err != nil {
8383
return err
8484
}
85-
itoken, err := cmd.Flags().GetString(trace.FlagInfluxDBToken)
85+
itoken, err := cmd.Flags().GetString(trace.FlagTracePullAddress)
8686
if err != nil {
8787
return err
8888
}
89-
if ifd.InfluxDBURL == "" {
90-
ifd.InfluxDBURL = iurl
91-
ifd.InfluxDBToken = itoken
89+
if ifd.TracePushConfig == "" {
90+
ifd.TracePushConfig = iurl
91+
ifd.TracePullAddress = itoken
9292
}
9393

9494
purl, err := cmd.Flags().GetString(trace.FlagPyroscopeURL)
@@ -186,9 +186,9 @@ func NewCLI() *CLI {
186186

187187
cli.root.PersistentFlags().StringP("infrastructure-data", "", "", "path to the json file containing the infrastructure data. Only used if the 'infrastructure-type' is set to a value other than 'docker'")
188188

189-
cli.root.PersistentFlags().String(trace.FlagInfluxDBURL, "", trace.FlagInfluxDBURLDescription)
189+
cli.root.PersistentFlags().String(trace.FlagTracePushConfig, "", trace.FlagTracePushConfigDescription)
190190

191-
cli.root.PersistentFlags().String(trace.FlagInfluxDBToken, "", trace.FlagInfluxDBTokenDescription)
191+
cli.root.PersistentFlags().String(trace.FlagTracePullAddress, "", trace.FlagTracePullAddressDescription)
192192

193193
cli.root.PersistentFlags().String(trace.FlagPyroscopeURL, "", trace.FlagPyroscopeURLDescription)
194194

‎test/e2e/runner/setup.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,9 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) {
166166
cfg.DBBackend = node.Database
167167
cfg.StateSync.DiscoveryTime = 5 * time.Second
168168

169-
cfg.Instrumentation.InfluxOrg = "celestia"
170-
cfg.Instrumentation.InfluxBucket = "e2e"
171-
cfg.Instrumentation.InfluxURL = node.InfluxDBURL
172-
cfg.Instrumentation.InfluxToken = node.InfluxDBToken
169+
cfg.Instrumentation.TraceType = "celestia"
170+
cfg.Instrumentation.TracePushConfig = node.TracePushConfig
171+
cfg.Instrumentation.TracePullAddress = node.TracePullAddress
173172
cfg.Instrumentation.PyroscopeTrace = node.PyroscopeTrace
174173
cfg.Instrumentation.PyroscopeURL = node.PyroscopeURL
175174
cfg.Instrumentation.PyroscopeProfileTypes = node.PyroscopeProfileTypes

‎test/maverick/node/node.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns,
427427
reactor := mempoolv1.NewReactor(
428428
config.Mempool,
429429
mp,
430-
&trace.Client{},
430+
trace.NoOpTracer(),
431431
)
432432
if config.Consensus.WaitForTxs() {
433433
mp.EnableTxsAvailable()

0 commit comments

Comments
 (0)
Please sign in to comment.