diff --git a/_examples/bulk/benchmarks/benchmarks.go b/_examples/bulk/benchmarks/benchmarks.go index 9c984d38d1..566c525a58 100644 --- a/_examples/bulk/benchmarks/benchmarks.go +++ b/_examples/bulk/benchmarks/benchmarks.go @@ -29,7 +29,6 @@ package main import ( "flag" "io" - "io/ioutil" "log" "net/http" "os" @@ -139,7 +138,7 @@ func main() { log.Fatalf("Error creating runner: %s", err) } - done := make(chan os.Signal) + done := make(chan os.Signal, 1) signal.Notify(done, os.Interrupt) go func() { <-done; log.Println("\r" + strings.Repeat("▁", 110)); runner.Report(); os.Exit(0) }() @@ -247,7 +246,7 @@ func (t *fasthttpTransport) copyResponse(dst *http.Response, src *fasthttp.Respo dst.Header.Set(string(k), string(v)) }) - dst.Body = ioutil.NopCloser(strings.NewReader(string(src.Body()))) + dst.Body = io.NopCloser(strings.NewReader(string(src.Body()))) return dst } diff --git a/_examples/bulk/kafka/consumer/consumer.go b/_examples/bulk/kafka/consumer/consumer.go index 08cd2eb25f..c9e9236a24 100644 --- a/_examples/bulk/kafka/consumer/consumer.go +++ b/_examples/bulk/kafka/consumer/consumer.go @@ -58,6 +58,9 @@ func (c *Consumer) Run(ctx context.Context) (err error) { ReadLagInterval: 1 * time.Second, }) + defer c.reader.Close() + defer c.Indexer.Close(ctx) + for { msg, err := c.reader.ReadMessage(ctx) if err != nil { @@ -91,10 +94,6 @@ func (c *Consumer) Run(ctx context.Context) (err error) { return fmt.Errorf("indexer: %s", err) } } - c.reader.Close() - c.Indexer.Close(ctx) - - return nil } type Stats struct { diff --git a/_examples/bulk/kafka/go.mod b/_examples/bulk/kafka/go.mod index eff6c665a5..3827453f01 100644 --- a/_examples/bulk/kafka/go.mod +++ b/_examples/bulk/kafka/go.mod @@ -1,6 +1,7 @@ module github.com/elastic/go-elasticsearch/v8/_examples/bulk/kafka go 1.21 + toolchain go1.21.0 replace github.com/elastic/go-elasticsearch/v8 => ../../.. @@ -8,11 +9,12 @@ replace github.com/elastic/go-elasticsearch/v8 => ../../.. require ( github.com/elastic/go-elasticsearch/v8 v8.0.0-20210817150010-57d659deaca7 github.com/segmentio/kafka-go v0.4.25 - go.elastic.co/apm v1.14.0 + go.elastic.co/apm v1.15.0 ) require ( github.com/armon/go-radix v1.0.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect github.com/elastic/go-licenser v0.3.1 // indirect github.com/elastic/go-sysinfo v1.1.1 // indirect @@ -27,6 +29,8 @@ require ( github.com/pkg/errors v0.8.1 // indirect github.com/prometheus/procfs v0.0.3 // indirect github.com/santhosh-tekuri/jsonschema v1.2.4 // indirect + go.elastic.co/apm/module/apmelasticsearch v1.15.0 // indirect + go.elastic.co/apm/module/apmhttp v1.15.0 // indirect go.elastic.co/fastjson v1.1.0 // indirect go.opentelemetry.io/otel v1.28.0 // indirect go.opentelemetry.io/otel/metric v1.28.0 // indirect diff --git a/_examples/bulk/kafka/go.sum b/_examples/bulk/kafka/go.sum index f1be7e8b0d..b9373af684 100644 --- a/_examples/bulk/kafka/go.sum +++ b/_examples/bulk/kafka/go.sum @@ -3,6 +3,8 @@ github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgI github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= @@ -66,6 +68,12 @@ github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0 github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.elastic.co/apm v1.14.0 h1:9yilcTbWpqhfyunUj6/SDpZbR4FOVB50xQgODe0TW/0= go.elastic.co/apm v1.14.0/go.mod h1:dylGv2HKR0tiCV+wliJz1KHtDyuD8SPe69oV7VyK6WY= +go.elastic.co/apm v1.15.0 h1:uPk2g/whK7c7XiZyz/YCUnAUBNPiyNeE3ARX3G6Gx7Q= +go.elastic.co/apm v1.15.0/go.mod h1:dylGv2HKR0tiCV+wliJz1KHtDyuD8SPe69oV7VyK6WY= +go.elastic.co/apm/module/apmelasticsearch v1.15.0 h1:c5/qg+9AYe1QCGhu7FGqoydY9NNkNzc+iRpJJXRK/WE= +go.elastic.co/apm/module/apmelasticsearch v1.15.0/go.mod h1:TO6L5GJoJNavqJfksrUnnEG485VNQvZRpsmrwJW9LN8= +go.elastic.co/apm/module/apmhttp v1.15.0 h1:Le/DhI0Cqpr9wG/NIGOkbz7+rOMqJrfE4MRG6q/+leU= +go.elastic.co/apm/module/apmhttp v1.15.0/go.mod h1:NruY6Jq8ALLzWUVUQ7t4wIzn+onKoiP5woJJdTV7GMg= go.elastic.co/fastjson v1.1.0 h1:3MrGBWWVIxe/xvsbpghtkFoPciPhOCmjsR/HfwEeQR4= go.elastic.co/fastjson v1.1.0/go.mod h1:boNGISWMjQsUPy/t6yqt2/1Wx4YNPSe+mZjlyw9vKKI= go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= @@ -103,8 +111,10 @@ golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200509030707-2212a7e161a5/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= @@ -116,6 +126,7 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/_examples/bulk/kafka/kafka.go b/_examples/bulk/kafka/kafka.go index e5a77de315..d0ece9406e 100644 --- a/_examples/bulk/kafka/kafka.go +++ b/_examples/bulk/kafka/kafka.go @@ -105,9 +105,9 @@ func main() { indexers []esutil.BulkIndexer ) - done := make(chan os.Signal) + done := make(chan os.Signal, 1) signal.Notify(done, os.Interrupt) - go func() { <-done; log.Println("\n"); os.Exit(0) }() + go func() { <-done; log.Println(""); os.Exit(0) }() // Set up producers // diff --git a/_examples/bulk/kafka/producer/producer.go b/_examples/bulk/kafka/producer/producer.go index 4ea8d075c5..407551d1ab 100644 --- a/_examples/bulk/kafka/producer/producer.go +++ b/_examples/bulk/kafka/producer/producer.go @@ -35,7 +35,6 @@ var ( ) func init() { - rand.Seed(time.Now().UnixNano()) kafka.DefaultClientID = "go-elasticsearch-kafka-demo" } @@ -61,26 +60,22 @@ func (p *Producer) Run(ctx context.Context) error { Brokers: []string{p.BrokerURL}, Topic: p.TopicName, }) + defer p.writer.Close() ticker := time.NewTicker(time.Second) + defer ticker.Stop() - for { - select { - case t := <-ticker.C: - for i := 1; i <= p.MessageRate; i++ { - messages = append(messages, kafka.Message{Value: p.generateMessage(t)}) - } - if err := p.writer.WriteMessages(ctx, messages...); err != nil { - messages = messages[:0] - return err - } + for t := range ticker.C { + for i := 1; i <= p.MessageRate; i++ { + messages = append(messages, kafka.Message{Value: p.generateMessage(t)}) + } + if err := p.writer.WriteMessages(ctx, messages...); err != nil { messages = messages[:0] + return err } + messages = messages[:0] } - p.writer.Close() - ticker.Stop() - return nil }