From 5f35ce8f2a0ee2acb2ae7626bf90ea12666d4025 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= <peterke@gmail.com> Date: Mon, 25 Oct 2021 13:11:05 +0300 Subject: [PATCH 1/8] consumer: lower old RDYs first, then assign to new connection Currently when a consumer is connected to a new NSQD, the connection is added to the pool of connections and then a loop iterates over all of them, rebalancing the RDY values. The iteration order of a map is random, thus it can happen that the consumer tries to assign the RDY value to the new connection, before decreasing the existing ones. This leads to a RDY of 0 being assigned. This issue is even more pronounced when there are only two connections: initially all the RDY are assigned the existing connection, and when a new NSQD is connected, the old one needs to be cut in half and the half just taken away gets assigned to the new connection. If map iteration happens in reverse order - starting with the new connection - it will get assigned 0 RDY and then the old connection gets cut in half. The end result is blocked communication on the new NSQD instance until a new round of rebalance is triggered. This issue may be less relevant in long running processes, but it is very annoying in tests where we're adding and remocing NSQD instances and the test hangs from time to time due to a flaky RDY allocation. The issue is fixed by fist iterating over all the existing connections and rebalancing them, and only at the very end calling maybeUpdateRDY on the new NSQD instance. --- consumer.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/consumer.go b/consumer.go index b4d7487b..895c89a5 100644 --- a/consumer.go +++ b/consumer.go @@ -609,8 +609,11 @@ func (r *Consumer) ConnectToNSQD(addr string) error { // pre-emptive signal to existing connections to lower their RDY count for _, c := range r.conns() { - r.maybeUpdateRDY(c) + if c != conn { + r.maybeUpdateRDY(c) + } } + r.maybeUpdateRDY(conn) return nil } @@ -912,7 +915,9 @@ func (r *Consumer) maybeUpdateRDY(conn *Conn) { count := r.perConnMaxInFlight() r.log(LogLevelDebug, "(%s) sending RDY %d", conn, count) - r.updateRDY(conn, count) + if err := r.updateRDY(conn, count); err != nil { + r.log(LogLevelWarning, "(%s) error sending RDY %d: %v", conn, count, err) + } } func (r *Consumer) rdyLoop() { From 00567053fbb870b32790334c2fe68a32aaf4a3b2 Mon Sep 17 00:00:00 2001 From: Pierce Lopez <pierce.lopez@gmail.com> Date: Sun, 5 Jun 2022 17:33:43 -0400 Subject: [PATCH 2/8] producer: handle case of no transactions in popTransaction() This corner case generally should not happen, but a nsqd bug or strange network condition could possibly send erroneous bytes that are interpreted as a response. The Producer should be robust, and not panic the whole process. --- producer.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/producer.go b/producer.go index 20fd0c87..853714f4 100644 --- a/producer.go +++ b/producer.go @@ -367,6 +367,17 @@ exit: } func (w *Producer) popTransaction(frameType int32, data []byte) { + if len(w.transactions) == 0 { + dataLen := len(data) + if dataLen > 32 { + data = data[:32] + } + w.log(LogLevelError, + "(%s) unexpected response type=%d len=%d data[:32]=0x%x", + w.conn.String(), frameType, dataLen, data) + w.close() + return + } t := w.transactions[0] w.transactions = w.transactions[1:] if frameType == FrameTypeError { From dc8315d50ebc69c13fd8eb3859d8ef6b690c9009 Mon Sep 17 00:00:00 2001 From: Jorge Carpio <carpioldc@gmail.com> Date: Thu, 12 Jan 2023 17:48:02 +0100 Subject: [PATCH 3/8] Limit read message size - Add MaxMsgSize to configuration mimicking the nsqd cofiguration key. It defaults to 1048576 as the nsqd config -- source: https://github.com/nsqio/nsq/blob/a4939964f6715edd27a6904b87c2f9eb6a45e749/nsqd/options.go#L130 - Pass MaxMsgSize to ReadResponse via its caller, ReadUnpackedResponse. Check msgSize does not exceed the maximum in ReadResponse. This reduces the risk of attempting to read arbitrary (non-nsq) responses. - Generate custom error if msgSize is the result of deserializing the first 4 bytes of an HTTP response (1213486160) to facilitate troublehooting. Default maxMsgSize to 0 for no limit Add unexpected HTTP response test Fix indentation --- config.go | 3 +++ conn.go | 12 ++++++------ producer_test.go | 23 +++++++++++++++++++++++ protocol.go | 17 ++++++++++++++--- 4 files changed, 46 insertions(+), 9 deletions(-) diff --git a/config.go b/config.go index 1f7ea2cd..38fffe45 100644 --- a/config.go +++ b/config.go @@ -177,6 +177,9 @@ type Config struct { // The server-side message timeout for messages delivered to this client MsgTimeout time.Duration `opt:"msg_timeout" min:"0"` + // Maximum size of a single message in bytes (0 means no limit) + MaxMsgSize int32 `opt:"max_msg_size" min:"0" default:"0"` + // Secret for nsqd authentication (requires nsqd 0.2.29+) AuthSecret string `opt:"auth_secret"` // Use AuthSecret as 'Authorization: Bearer {AuthSecret}' on lookupd queries diff --git a/conn.go b/conn.go index 6fa1ce89..0c382550 100644 --- a/conn.go +++ b/conn.go @@ -355,7 +355,7 @@ func (c *Conn) identify() (*IdentifyResponse, error) { return nil, ErrIdentify{err.Error()} } - frameType, data, err := ReadUnpackedResponse(c) + frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize) if err != nil { return nil, ErrIdentify{err.Error()} } @@ -434,7 +434,7 @@ func (c *Conn) upgradeTLS(tlsConf *tls.Config) error { } c.r = c.tlsConn c.w = c.tlsConn - frameType, data, err := ReadUnpackedResponse(c) + frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize) if err != nil { return err } @@ -452,7 +452,7 @@ func (c *Conn) upgradeDeflate(level int) error { fw, _ := flate.NewWriter(conn, level) c.r = flate.NewReader(conn) c.w = fw - frameType, data, err := ReadUnpackedResponse(c) + frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize) if err != nil { return err } @@ -469,7 +469,7 @@ func (c *Conn) upgradeSnappy() error { } c.r = snappy.NewReader(conn) c.w = snappy.NewWriter(conn) - frameType, data, err := ReadUnpackedResponse(c) + frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize) if err != nil { return err } @@ -490,7 +490,7 @@ func (c *Conn) auth(secret string) error { return err } - frameType, data, err := ReadUnpackedResponse(c) + frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize) if err != nil { return err } @@ -518,7 +518,7 @@ func (c *Conn) readLoop() { goto exit } - frameType, data, err := ReadUnpackedResponse(c) + frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize) if err != nil { if err == io.EOF && atomic.LoadInt32(&c.closeFlag) == 1 { goto exit diff --git a/producer_test.go b/producer_test.go index 2b6f89ba..e0cf7bf1 100755 --- a/producer_test.go +++ b/producer_test.go @@ -9,6 +9,7 @@ import ( "os" "runtime" "strconv" + "strings" "sync" "sync/atomic" "testing" @@ -255,6 +256,28 @@ func TestProducerHeartbeat(t *testing.T) { readMessages(topicName, t, msgCount+1) } +func TestProducerHTTPConnectionFails(t *testing.T) { + config := NewConfig() + laddr := "127.0.0.1" + + config.LocalAddr, _ = net.ResolveTCPAddr("tcp", laddr+":0") + config.MaxMsgSize = 1048576 + + w, _ := NewProducer("127.0.0.1:4151", config) + w.SetLogger(nullLogger, LogLevelInfo) + + err := w.Publish("write_test", []byte("test")) + if err == nil { + t.Fatal("should fail connecting to HTTP endpoint", err) + } + + if !strings.Contains(err.Error(), "unexpected HTTP response") { + t.Fatalf("should detect unexpected HTTP response, but got err: %s", err) + } + + w.Stop() +} + func readMessages(topicName string, t *testing.T, msgCount int) { config := NewConfig() config.DefaultRequeueDelay = 0 diff --git a/protocol.go b/protocol.go index 1d20851b..356c4d25 100644 --- a/protocol.go +++ b/protocol.go @@ -21,6 +21,9 @@ const ( FrameTypeMessage int32 = 2 ) +// Used to detect if an unexpected HTTP response is read +const httpResponseMsgSize = 1213486160 + var validTopicChannelNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+(#ephemeral)?$`) // IsValidTopicName checks a topic name for correctness @@ -48,7 +51,7 @@ func isValidName(name string) bool { // | 4-byte || N-byte // ------------------------... // size data -func ReadResponse(r io.Reader) ([]byte, error) { +func ReadResponse(r io.Reader, maxMsgSize int32) ([]byte, error) { var msgSize int32 // message size @@ -60,6 +63,14 @@ func ReadResponse(r io.Reader) ([]byte, error) { if msgSize < 0 { return nil, fmt.Errorf("response msg size is negative: %v", msgSize) } + + if maxMsgSize > 0 && msgSize > maxMsgSize { + if msgSize == httpResponseMsgSize { + return nil, fmt.Errorf("unexpected HTTP response, a nsqd TCP endpoint is required") + } + return nil, fmt.Errorf("response msg size %v exceeds configured maximum (%v)", msgSize, maxMsgSize) + } + // message binary data buf := make([]byte, msgSize) _, err = io.ReadFull(r, buf) @@ -91,8 +102,8 @@ func UnpackResponse(response []byte) (int32, []byte, error) { // ReadUnpackedResponse reads and parses data from the underlying // TCP connection according to the NSQ TCP protocol spec and // returns the frameType, data or error -func ReadUnpackedResponse(r io.Reader) (int32, []byte, error) { - resp, err := ReadResponse(r) +func ReadUnpackedResponse(r io.Reader, maxMsgSize int32) (int32, []byte, error) { + resp, err := ReadResponse(r, maxMsgSize) if err != nil { return -1, nil, err } From d9c722e4f446f64e42dd05d3d2c9e8ee146d9337 Mon Sep 17 00:00:00 2001 From: guoguangwu <guoguangwu@magic-shield.com> Date: Fri, 14 Jul 2023 16:14:21 +0800 Subject: [PATCH 4/8] chore: use time.Since instead of time.Now().Sub --- conn.go | 4 ++-- consumer.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/conn.go b/conn.go index 0c382550..321da690 100644 --- a/conn.go +++ b/conn.go @@ -692,7 +692,7 @@ func (c *Conn) cleanup() { msgsInFlight = atomic.LoadInt64(&c.messagesInFlight) } if msgsInFlight > 0 { - if time.Now().Sub(lastWarning) > time.Second { + if time.Since(lastWarning) > time.Second { c.log(LogLevelWarning, "draining... waiting for %d messages in flight", msgsInFlight) lastWarning = time.Now() } @@ -701,7 +701,7 @@ func (c *Conn) cleanup() { // until the readLoop has exited we cannot be sure that there // still won't be a race if atomic.LoadInt32(&c.readLoopRunning) == 1 { - if time.Now().Sub(lastWarning) > time.Second { + if time.Since(lastWarning) > time.Second { c.log(LogLevelWarning, "draining... readLoop still running") lastWarning = time.Now() } diff --git a/consumer.go b/consumer.go index 895c89a5..390002a6 100644 --- a/consumer.go +++ b/consumer.go @@ -1033,8 +1033,8 @@ func (r *Consumer) redistributeRDY() { possibleConns := make([]*Conn, 0, len(conns)) for _, c := range conns { - lastMsgDuration := time.Now().Sub(c.LastMessageTime()) - lastRdyDuration := time.Now().Sub(c.LastRdyTime()) + lastMsgDuration := time.Since(c.LastMessageTime()) + lastRdyDuration := time.Since(c.LastRdyTime()) rdyCount := c.RDY() r.log(LogLevelDebug, "(%s) rdy: %d (last message received %s)", c.String(), rdyCount, lastMsgDuration) From 9b091e0f1e58bbb1e177f02c91119893d6cdf26a Mon Sep 17 00:00:00 2001 From: Jehiah Czebotar <jehiah@gmail.com> Date: Fri, 8 Sep 2023 10:57:21 -0400 Subject: [PATCH 5/8] go.mod: upgrade snappy => v0.0.4; upgrade go.mod version 1.17+ --- go.mod | 4 ++-- go.sum | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 8b61b043..d6cfc37e 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,5 @@ module github.com/nsqio/go-nsq -go 1.11 +go 1.17 -require github.com/golang/snappy v0.0.1 +require github.com/golang/snappy v0.0.4 diff --git a/go.sum b/go.sum index 331e6a1a..74eae48d 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,2 @@ -github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= From 9982462b80bbfd6c47fdf0a05060626a1c9f2c8c Mon Sep 17 00:00:00 2001 From: Jehiah Czebotar <jehiah@gmail.com> Date: Fri, 20 Nov 2020 21:21:21 -0500 Subject: [PATCH 6/8] Set topology region+zone and send in IDENTIFY msg --- config.go | 4 ++++ conn.go | 2 ++ 2 files changed, 6 insertions(+) diff --git a/config.go b/config.go index 38fffe45..6fa9f510 100644 --- a/config.go +++ b/config.go @@ -140,6 +140,10 @@ type Config struct { Hostname string `opt:"hostname"` UserAgent string `opt:"user_agent"` + // Topology hints allow nsqd to prefer same zone and same region consumers + TopologyRegion string `opt:"topology_region"` + TopologyZone string `opt:"topology_zone"` + // Duration of time between heartbeats. This must be less than ReadTimeout HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"` // Integer percentage to sample the channel (requires nsqd 0.2.25+) diff --git a/conn.go b/conn.go index 321da690..0f5ff3a2 100644 --- a/conn.go +++ b/conn.go @@ -345,6 +345,8 @@ func (c *Conn) identify() (*IdentifyResponse, error) { ci["output_buffer_timeout"] = int64(c.config.OutputBufferTimeout / time.Millisecond) } ci["msg_timeout"] = int64(c.config.MsgTimeout / time.Millisecond) + ci["topology_region"] = c.config.TopologyRegion + ci["topology_zone"] = c.config.TopologyZone cmd, err := Identify(ci) if err != nil { return nil, ErrIdentify{err.Error()} From 4007ceb6bd754f2c375cf417ec4b14f2181eb129 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BE=AF=E5=B0=A7?= <houyao@newrank.cn> Date: Tue, 30 Jul 2024 14:56:40 +0800 Subject: [PATCH 7/8] style: Only do code optimization --- api_request.go | 4 ++-- config.go | 15 +++++++-------- conn.go | 5 ++--- consumer.go | 20 +++++++++----------- consumer_test.go | 6 +++--- mock_test.go | 2 +- producer.go | 5 ++--- producer_test.go | 4 ++-- protocol.go | 20 ++++++++++---------- 9 files changed, 38 insertions(+), 43 deletions(-) diff --git a/api_request.go b/api_request.go index e565e7cf..b88984df 100644 --- a/api_request.go +++ b/api_request.go @@ -3,7 +3,7 @@ package nsq import ( "encoding/json" "fmt" - "io/ioutil" + "io" "net" "net/http" "time" @@ -47,7 +47,7 @@ func apiRequestNegotiateV1(httpclient *http.Client, method string, endpoint stri return err } - respBody, err := ioutil.ReadAll(resp.Body) + respBody, err := io.ReadAll(resp.Body) resp.Body.Close() if err != nil { return err diff --git a/config.go b/config.go index 38fffe45..b7d877ee 100644 --- a/config.go +++ b/config.go @@ -5,7 +5,6 @@ import ( "crypto/x509" "errors" "fmt" - "io/ioutil" "log" "math" "math/rand" @@ -208,15 +207,15 @@ func NewConfig() *Config { // // Calls to Set() that take a time.Duration as an argument can be input as: // -// "1000ms" (a string parsed by time.ParseDuration()) -// 1000 (an integer interpreted as milliseconds) -// 1000*time.Millisecond (a literal time.Duration value) +// "1000ms" (a string parsed by time.ParseDuration()) +// 1000 (an integer interpreted as milliseconds) +// 1000*time.Millisecond (a literal time.Duration value) // // Calls to Set() that take bool can be input as: // -// "true" (a string parsed by strconv.ParseBool()) -// true (a boolean) -// 1 (an int where 1 == true and 0 == false) +// "true" (a string parsed by strconv.ParseBool()) +// true (a boolean) +// 1 (an int where 1 == true and 0 == false) // // It returns an error for an invalid option or value. func (c *Config) Set(option string, value interface{}) error { @@ -434,7 +433,7 @@ func (t *tlsConfig) Set(c *Config, option string, value interface{}) error { return fmt.Errorf("ERROR: %v is not a string", value) } tlsCertPool := x509.NewCertPool() - caCertFile, err := ioutil.ReadFile(filename) + caCertFile, err := os.ReadFile(filename) if err != nil { return fmt.Errorf("ERROR: failed to read custom Certificate Authority file %s", err) } diff --git a/conn.go b/conn.go index 321da690..8ec8a4ab 100644 --- a/conn.go +++ b/conn.go @@ -119,8 +119,7 @@ func NewConn(addr string, config *Config, delegate ConnDelegate) *Conn { // The logger parameter is an interface that requires the following // method to be implemented (such as the the stdlib log.Logger): // -// Output(calldepth int, s string) -// +// Output(calldepth int, s string) func (c *Conn) SetLogger(l logger, lvl LogLevel, format string) { c.logGuard.Lock() defer c.logGuard.Unlock() @@ -468,7 +467,7 @@ func (c *Conn) upgradeSnappy() error { conn = c.tlsConn } c.r = snappy.NewReader(conn) - c.w = snappy.NewWriter(conn) + c.w = snappy.NewBufferedWriter(conn) frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize) if err != nil { return err diff --git a/consumer.go b/consumer.go index 390002a6..77d0acdc 100644 --- a/consumer.go +++ b/consumer.go @@ -33,9 +33,9 @@ type Handler interface { // HandlerFunc is a convenience type to avoid having to declare a struct // to implement the Handler interface, it can be used like this: // -// consumer.AddHandler(nsq.HandlerFunc(func(m *Message) error { -// // handle the message -// })) +// consumer.AddHandler(nsq.HandlerFunc(func(m *Message) error { +// // handle the message +// })) type HandlerFunc func(message *Message) error // HandleMessage implements the Handler interface @@ -220,8 +220,7 @@ func (r *Consumer) conns() []*Conn { // The logger parameter is an interface that requires the following // method to be implemented (such as the the stdlib log.Logger): // -// Output(calldepth int, s string) error -// +// Output(calldepth int, s string) error func (r *Consumer) SetLogger(l logger, lvl LogLevel) { r.logGuard.Lock() defer r.logGuard.Unlock() @@ -266,8 +265,7 @@ func (r *Consumer) getLogLevel() LogLevel { // of the following interfaces that modify the behavior // of the `Consumer`: // -// DiscoveryFilter -// +// DiscoveryFilter func (r *Consumer) SetBehaviorDelegate(cb interface{}) { matched := false @@ -312,7 +310,7 @@ func (r *Consumer) getMaxInFlight() int32 { // ChangeMaxInFlight sets a new maximum number of messages this comsumer instance // will allow in-flight, and updates all existing connections as appropriate. // -// For example, ChangeMaxInFlight(0) would pause message flow +// # For example, ChangeMaxInFlight(0) would pause message flow // // If already connected, it updates the reader RDY state for each connection. func (r *Consumer) ChangeMaxInFlight(maxInFlight int) { @@ -1109,7 +1107,7 @@ func (r *Consumer) stopHandlers() { // AddHandler sets the Handler for messages received by this Consumer. This can be called // multiple times to add additional handlers. Handler will have a 1:1 ratio to message handling goroutines. // -// This panics if called after connecting to NSQD or NSQ Lookupd +// # This panics if called after connecting to NSQD or NSQ Lookupd // // (see Handler or HandlerFunc for details on implementing this interface) func (r *Consumer) AddHandler(handler Handler) { @@ -1120,7 +1118,7 @@ func (r *Consumer) AddHandler(handler Handler) { // takes a second argument which indicates the number of goroutines to spawn for // message handling. // -// This panics if called after connecting to NSQD or NSQ Lookupd +// # This panics if called after connecting to NSQD or NSQ Lookupd // // (see Handler or HandlerFunc for details on implementing this interface) func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) { @@ -1228,7 +1226,7 @@ func buildLookupAddr(addr, topic string) (string, error) { u.Path = "/lookup" } - v, err := url.ParseQuery(u.RawQuery) + v, _ := url.ParseQuery(u.RawQuery) v.Add("topic", topic) u.RawQuery = v.Encode() return u.String(), nil diff --git a/consumer_test.go b/consumer_test.go index dcb34df0..945f5c0c 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -6,7 +6,7 @@ import ( "encoding/json" "errors" "fmt" - "io/ioutil" + "io" "log" "net" "net/http" @@ -25,7 +25,7 @@ type MyTestHandler struct { messagesFailed int } -var nullLogger = log.New(ioutil.Discard, "", log.LstdFlags) +var nullLogger = log.New(io.Discard, "", log.LstdFlags) func (h *MyTestHandler) LogFailedMessage(message *Message) { h.messagesFailed++ @@ -58,7 +58,7 @@ func (h *MyTestHandler) HandleMessage(message *Message) error { func SendMessage(t *testing.T, port int, topic string, method string, body []byte) { httpclient := &http.Client{} endpoint := fmt.Sprintf("http://127.0.0.1:%d/%s?topic=%s", port, method, topic) - req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(body)) + req, _ := http.NewRequest("POST", endpoint, bytes.NewBuffer(body)) resp, err := httpclient.Do(req) if err != nil { t.Fatalf(err.Error()) diff --git a/mock_test.go b/mock_test.go index 057442a3..44656630 100644 --- a/mock_test.go +++ b/mock_test.go @@ -184,7 +184,7 @@ func framedResponse(frameType int32, data []byte) []byte { return nil } - _, err = w.Write(data) + _, _ = w.Write(data) return w.Bytes() } diff --git a/producer.go b/producer.go index 853714f4..4019fefd 100644 --- a/producer.go +++ b/producer.go @@ -92,7 +92,7 @@ func NewProducer(addr string, config *Config) (*Producer, error) { // Set default logger for all log levels l := log.New(os.Stderr, "", log.Flags()) - for index, _ := range p.logger { + for index := range p.logger { p.logger[index] = l } return p, nil @@ -120,8 +120,7 @@ func (w *Producer) Ping() error { // The logger parameter is an interface that requires the following // method to be implemented (such as the the stdlib log.Logger): // -// Output(calldepth int, s string) -// +// Output(calldepth int, s string) func (w *Producer) SetLogger(l logger, lvl LogLevel) { w.logGuard.Lock() defer w.logGuard.Unlock() diff --git a/producer_test.go b/producer_test.go index e0cf7bf1..de895ad7 100755 --- a/producer_test.go +++ b/producer_test.go @@ -3,7 +3,7 @@ package nsq import ( "bytes" "errors" - "io/ioutil" + "io" "log" "net" "os" @@ -63,7 +63,7 @@ func TestProducerConnection(t *testing.T) { } func TestProducerPing(t *testing.T) { - log.SetOutput(ioutil.Discard) + log.SetOutput(io.Discard) defer log.SetOutput(os.Stdout) config := NewConfig() diff --git a/protocol.go b/protocol.go index 356c4d25..1d0e1a9d 100644 --- a/protocol.go +++ b/protocol.go @@ -46,11 +46,11 @@ func isValidName(name string) bool { // ReadResponse is a client-side utility function to read from the supplied Reader // according to the NSQ protocol spec: // -// [x][x][x][x][x][x][x][x]... -// | (int32) || (binary) -// | 4-byte || N-byte -// ------------------------... -// size data +// [x][x][x][x][x][x][x][x]... +// | (int32) || (binary) +// | 4-byte || N-byte +// ------------------------... +// size data func ReadResponse(r io.Reader, maxMsgSize int32) ([]byte, error) { var msgSize int32 @@ -84,11 +84,11 @@ func ReadResponse(r io.Reader, maxMsgSize int32) ([]byte, error) { // UnpackResponse is a client-side utility function that unpacks serialized data // according to NSQ protocol spec: // -// [x][x][x][x][x][x][x][x]... -// | (int32) || (binary) -// | 4-byte || N-byte -// ------------------------... -// frame ID data +// [x][x][x][x][x][x][x][x]... +// | (int32) || (binary) +// | 4-byte || N-byte +// ------------------------... +// frame ID data // // Returns a triplicate of: frame type, data ([]byte), error func UnpackResponse(response []byte) (int32, []byte, error) { From d54534195e974483777d45774856cf315af7d2f7 Mon Sep 17 00:00:00 2001 From: Jehiah Czebotar <jehiah@gmail.com> Date: Thu, 13 Mar 2025 09:40:46 -0400 Subject: [PATCH 8/8] CI updates --- .github/workflows/test.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6a0062bd..070dae1b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -6,26 +6,26 @@ on: jobs: test: - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest timeout-minutes: 30 strategy: fail-fast: false matrix: imgtag: - - "golang:1.15-buster" - - "golang:1.16-buster" - - "golang:1.17-buster" + - "golang:1.23-bullseye" + - "golang:1.24-bullseye" goarch: - "amd64" nsq_ver: - "nsq-1.1.0.linux-amd64.go1.10.3" - "nsq-1.2.0.linux-amd64.go1.12.9" - "nsq-1.2.1.linux-amd64.go1.16.6" + - "nsq-1.3.0.linux-amd64.go1.21.5" include: # test 386 only against latest version of NSQ - - imgtag: "golang:1.17-buster" + - imgtag: "golang:1.24-bullseye" goarch: "386" - nsq_ver: "nsq-1.2.1.linux-amd64.go1.16.6" + nsq_ver: "nsq-1.3.0.linux-amd64.go1.21.5" container: "${{matrix.imgtag}}" env: