From 5f35ce8f2a0ee2acb2ae7626bf90ea12666d4025 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= <peterke@gmail.com>
Date: Mon, 25 Oct 2021 13:11:05 +0300
Subject: [PATCH 1/8] consumer: lower old RDYs first, then assign to new
 connection

Currently when a consumer is connected to a new NSQD, the connection
is added to the pool of connections and then a loop iterates over all
of them, rebalancing the RDY values. The iteration order of a map is
random, thus it can happen that the consumer tries to assign the RDY
value to the new connection, before decreasing the existing ones. This
leads to a RDY of 0 being assigned.

This issue is even more pronounced when there are only two connections:
initially all the RDY are assigned the existing connection, and when a
new NSQD is connected, the old one needs to be cut in half and the half
just taken away gets assigned to the new connection. If map iteration
happens in reverse order - starting with the new connection - it will
get assigned 0 RDY and then the old connection gets cut in half. The
end result is blocked communication on the new NSQD instance until a new
round of rebalance is triggered.

This issue may be less relevant in long running processes, but it is
very annoying in tests where we're adding and remocing NSQD instances
and the test hangs from time to time due to a flaky RDY allocation.

The issue is fixed by fist iterating over all the existing connections
and rebalancing them, and only at the very end calling maybeUpdateRDY on
the new NSQD instance.
---
 consumer.go | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/consumer.go b/consumer.go
index b4d7487b..895c89a5 100644
--- a/consumer.go
+++ b/consumer.go
@@ -609,8 +609,11 @@ func (r *Consumer) ConnectToNSQD(addr string) error {
 
 	// pre-emptive signal to existing connections to lower their RDY count
 	for _, c := range r.conns() {
-		r.maybeUpdateRDY(c)
+		if c != conn {
+			r.maybeUpdateRDY(c)
+		}
 	}
+	r.maybeUpdateRDY(conn)
 
 	return nil
 }
@@ -912,7 +915,9 @@ func (r *Consumer) maybeUpdateRDY(conn *Conn) {
 
 	count := r.perConnMaxInFlight()
 	r.log(LogLevelDebug, "(%s) sending RDY %d", conn, count)
-	r.updateRDY(conn, count)
+	if err := r.updateRDY(conn, count); err != nil {
+		r.log(LogLevelWarning, "(%s) error sending RDY %d: %v", conn, count, err)
+	}
 }
 
 func (r *Consumer) rdyLoop() {

From 00567053fbb870b32790334c2fe68a32aaf4a3b2 Mon Sep 17 00:00:00 2001
From: Pierce Lopez <pierce.lopez@gmail.com>
Date: Sun, 5 Jun 2022 17:33:43 -0400
Subject: [PATCH 2/8] producer: handle case of no transactions in
 popTransaction()

This corner case generally should not happen, but a nsqd bug or
strange network condition could possibly send erroneous bytes that
are interpreted as a response. The Producer should be robust,
and not panic the whole process.
---
 producer.go | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/producer.go b/producer.go
index 20fd0c87..853714f4 100644
--- a/producer.go
+++ b/producer.go
@@ -367,6 +367,17 @@ exit:
 }
 
 func (w *Producer) popTransaction(frameType int32, data []byte) {
+	if len(w.transactions) == 0 {
+		dataLen := len(data)
+		if dataLen > 32 {
+			data = data[:32]
+		}
+		w.log(LogLevelError,
+			"(%s) unexpected response type=%d len=%d data[:32]=0x%x",
+			w.conn.String(), frameType, dataLen, data)
+		w.close()
+		return
+	}
 	t := w.transactions[0]
 	w.transactions = w.transactions[1:]
 	if frameType == FrameTypeError {

From dc8315d50ebc69c13fd8eb3859d8ef6b690c9009 Mon Sep 17 00:00:00 2001
From: Jorge Carpio <carpioldc@gmail.com>
Date: Thu, 12 Jan 2023 17:48:02 +0100
Subject: [PATCH 3/8] Limit read message size

- Add MaxMsgSize to configuration mimicking the nsqd cofiguration key.
  It defaults to 1048576 as the nsqd config -- source:
  https://github.com/nsqio/nsq/blob/a4939964f6715edd27a6904b87c2f9eb6a45e749/nsqd/options.go#L130

- Pass MaxMsgSize to ReadResponse via its caller, ReadUnpackedResponse.
  Check msgSize does not exceed the maximum in ReadResponse. This
  reduces the risk of attempting to read arbitrary (non-nsq) responses.

- Generate custom error if msgSize is the result of deserializing the
  first 4 bytes of an HTTP response (1213486160) to facilitate troublehooting.

Default maxMsgSize to 0 for no limit

Add unexpected HTTP response test

Fix indentation
---
 config.go        |  3 +++
 conn.go          | 12 ++++++------
 producer_test.go | 23 +++++++++++++++++++++++
 protocol.go      | 17 ++++++++++++++---
 4 files changed, 46 insertions(+), 9 deletions(-)

diff --git a/config.go b/config.go
index 1f7ea2cd..38fffe45 100644
--- a/config.go
+++ b/config.go
@@ -177,6 +177,9 @@ type Config struct {
 	// The server-side message timeout for messages delivered to this client
 	MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`
 
+	// Maximum size of a single message in bytes (0 means no limit)
+	MaxMsgSize int32 `opt:"max_msg_size" min:"0" default:"0"`
+
 	// Secret for nsqd authentication (requires nsqd 0.2.29+)
 	AuthSecret string `opt:"auth_secret"`
 	// Use AuthSecret as 'Authorization: Bearer {AuthSecret}' on lookupd queries
diff --git a/conn.go b/conn.go
index 6fa1ce89..0c382550 100644
--- a/conn.go
+++ b/conn.go
@@ -355,7 +355,7 @@ func (c *Conn) identify() (*IdentifyResponse, error) {
 		return nil, ErrIdentify{err.Error()}
 	}
 
-	frameType, data, err := ReadUnpackedResponse(c)
+	frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
 	if err != nil {
 		return nil, ErrIdentify{err.Error()}
 	}
@@ -434,7 +434,7 @@ func (c *Conn) upgradeTLS(tlsConf *tls.Config) error {
 	}
 	c.r = c.tlsConn
 	c.w = c.tlsConn
-	frameType, data, err := ReadUnpackedResponse(c)
+	frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
 	if err != nil {
 		return err
 	}
@@ -452,7 +452,7 @@ func (c *Conn) upgradeDeflate(level int) error {
 	fw, _ := flate.NewWriter(conn, level)
 	c.r = flate.NewReader(conn)
 	c.w = fw
-	frameType, data, err := ReadUnpackedResponse(c)
+	frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
 	if err != nil {
 		return err
 	}
@@ -469,7 +469,7 @@ func (c *Conn) upgradeSnappy() error {
 	}
 	c.r = snappy.NewReader(conn)
 	c.w = snappy.NewWriter(conn)
-	frameType, data, err := ReadUnpackedResponse(c)
+	frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
 	if err != nil {
 		return err
 	}
@@ -490,7 +490,7 @@ func (c *Conn) auth(secret string) error {
 		return err
 	}
 
-	frameType, data, err := ReadUnpackedResponse(c)
+	frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
 	if err != nil {
 		return err
 	}
@@ -518,7 +518,7 @@ func (c *Conn) readLoop() {
 			goto exit
 		}
 
-		frameType, data, err := ReadUnpackedResponse(c)
+		frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
 		if err != nil {
 			if err == io.EOF && atomic.LoadInt32(&c.closeFlag) == 1 {
 				goto exit
diff --git a/producer_test.go b/producer_test.go
index 2b6f89ba..e0cf7bf1 100755
--- a/producer_test.go
+++ b/producer_test.go
@@ -9,6 +9,7 @@ import (
 	"os"
 	"runtime"
 	"strconv"
+	"strings"
 	"sync"
 	"sync/atomic"
 	"testing"
@@ -255,6 +256,28 @@ func TestProducerHeartbeat(t *testing.T) {
 	readMessages(topicName, t, msgCount+1)
 }
 
+func TestProducerHTTPConnectionFails(t *testing.T) {
+	config := NewConfig()
+	laddr := "127.0.0.1"
+
+	config.LocalAddr, _ = net.ResolveTCPAddr("tcp", laddr+":0")
+	config.MaxMsgSize = 1048576
+
+	w, _ := NewProducer("127.0.0.1:4151", config)
+	w.SetLogger(nullLogger, LogLevelInfo)
+
+	err := w.Publish("write_test", []byte("test"))
+	if err == nil {
+		t.Fatal("should fail connecting to HTTP endpoint", err)
+	}
+
+	if !strings.Contains(err.Error(), "unexpected HTTP response") {
+		t.Fatalf("should detect unexpected HTTP response, but got err: %s", err)
+	}
+
+	w.Stop()
+}
+
 func readMessages(topicName string, t *testing.T, msgCount int) {
 	config := NewConfig()
 	config.DefaultRequeueDelay = 0
diff --git a/protocol.go b/protocol.go
index 1d20851b..356c4d25 100644
--- a/protocol.go
+++ b/protocol.go
@@ -21,6 +21,9 @@ const (
 	FrameTypeMessage  int32 = 2
 )
 
+// Used to detect if an unexpected HTTP response is read
+const httpResponseMsgSize = 1213486160
+
 var validTopicChannelNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+(#ephemeral)?$`)
 
 // IsValidTopicName checks a topic name for correctness
@@ -48,7 +51,7 @@ func isValidName(name string) bool {
 //    |  4-byte  || N-byte
 //    ------------------------...
 //        size       data
-func ReadResponse(r io.Reader) ([]byte, error) {
+func ReadResponse(r io.Reader, maxMsgSize int32) ([]byte, error) {
 	var msgSize int32
 
 	// message size
@@ -60,6 +63,14 @@ func ReadResponse(r io.Reader) ([]byte, error) {
 	if msgSize < 0 {
 		return nil, fmt.Errorf("response msg size is negative: %v", msgSize)
 	}
+
+	if maxMsgSize > 0 && msgSize > maxMsgSize {
+		if msgSize == httpResponseMsgSize {
+			return nil, fmt.Errorf("unexpected HTTP response, a nsqd TCP endpoint is required")
+		}
+		return nil, fmt.Errorf("response msg size %v exceeds configured maximum (%v)", msgSize, maxMsgSize)
+	}
+
 	// message binary data
 	buf := make([]byte, msgSize)
 	_, err = io.ReadFull(r, buf)
@@ -91,8 +102,8 @@ func UnpackResponse(response []byte) (int32, []byte, error) {
 // ReadUnpackedResponse reads and parses data from the underlying
 // TCP connection according to the NSQ TCP protocol spec and
 // returns the frameType, data or error
-func ReadUnpackedResponse(r io.Reader) (int32, []byte, error) {
-	resp, err := ReadResponse(r)
+func ReadUnpackedResponse(r io.Reader, maxMsgSize int32) (int32, []byte, error) {
+	resp, err := ReadResponse(r, maxMsgSize)
 	if err != nil {
 		return -1, nil, err
 	}

From d9c722e4f446f64e42dd05d3d2c9e8ee146d9337 Mon Sep 17 00:00:00 2001
From: guoguangwu <guoguangwu@magic-shield.com>
Date: Fri, 14 Jul 2023 16:14:21 +0800
Subject: [PATCH 4/8] chore: use time.Since instead of time.Now().Sub

---
 conn.go     | 4 ++--
 consumer.go | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/conn.go b/conn.go
index 0c382550..321da690 100644
--- a/conn.go
+++ b/conn.go
@@ -692,7 +692,7 @@ func (c *Conn) cleanup() {
 			msgsInFlight = atomic.LoadInt64(&c.messagesInFlight)
 		}
 		if msgsInFlight > 0 {
-			if time.Now().Sub(lastWarning) > time.Second {
+			if time.Since(lastWarning) > time.Second {
 				c.log(LogLevelWarning, "draining... waiting for %d messages in flight", msgsInFlight)
 				lastWarning = time.Now()
 			}
@@ -701,7 +701,7 @@ func (c *Conn) cleanup() {
 		// until the readLoop has exited we cannot be sure that there
 		// still won't be a race
 		if atomic.LoadInt32(&c.readLoopRunning) == 1 {
-			if time.Now().Sub(lastWarning) > time.Second {
+			if time.Since(lastWarning) > time.Second {
 				c.log(LogLevelWarning, "draining... readLoop still running")
 				lastWarning = time.Now()
 			}
diff --git a/consumer.go b/consumer.go
index 895c89a5..390002a6 100644
--- a/consumer.go
+++ b/consumer.go
@@ -1033,8 +1033,8 @@ func (r *Consumer) redistributeRDY() {
 
 	possibleConns := make([]*Conn, 0, len(conns))
 	for _, c := range conns {
-		lastMsgDuration := time.Now().Sub(c.LastMessageTime())
-		lastRdyDuration := time.Now().Sub(c.LastRdyTime())
+		lastMsgDuration := time.Since(c.LastMessageTime())
+		lastRdyDuration := time.Since(c.LastRdyTime())
 		rdyCount := c.RDY()
 		r.log(LogLevelDebug, "(%s) rdy: %d (last message received %s)",
 			c.String(), rdyCount, lastMsgDuration)

From 9b091e0f1e58bbb1e177f02c91119893d6cdf26a Mon Sep 17 00:00:00 2001
From: Jehiah Czebotar <jehiah@gmail.com>
Date: Fri, 8 Sep 2023 10:57:21 -0400
Subject: [PATCH 5/8] go.mod: upgrade snappy => v0.0.4; upgrade go.mod version
 1.17+

---
 go.mod | 4 ++--
 go.sum | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/go.mod b/go.mod
index 8b61b043..d6cfc37e 100644
--- a/go.mod
+++ b/go.mod
@@ -1,5 +1,5 @@
 module github.com/nsqio/go-nsq
 
-go 1.11
+go 1.17
 
-require github.com/golang/snappy v0.0.1
+require github.com/golang/snappy v0.0.4
diff --git a/go.sum b/go.sum
index 331e6a1a..74eae48d 100644
--- a/go.sum
+++ b/go.sum
@@ -1,2 +1,2 @@
-github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
-github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
+github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=

From 9982462b80bbfd6c47fdf0a05060626a1c9f2c8c Mon Sep 17 00:00:00 2001
From: Jehiah Czebotar <jehiah@gmail.com>
Date: Fri, 20 Nov 2020 21:21:21 -0500
Subject: [PATCH 6/8] Set topology region+zone and send in IDENTIFY msg

---
 config.go | 4 ++++
 conn.go   | 2 ++
 2 files changed, 6 insertions(+)

diff --git a/config.go b/config.go
index 38fffe45..6fa9f510 100644
--- a/config.go
+++ b/config.go
@@ -140,6 +140,10 @@ type Config struct {
 	Hostname  string `opt:"hostname"`
 	UserAgent string `opt:"user_agent"`
 
+	// Topology hints allow nsqd to prefer same zone and same region consumers
+	TopologyRegion string `opt:"topology_region"`
+	TopologyZone   string `opt:"topology_zone"`
+
 	// Duration of time between heartbeats. This must be less than ReadTimeout
 	HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"`
 	// Integer percentage to sample the channel (requires nsqd 0.2.25+)
diff --git a/conn.go b/conn.go
index 321da690..0f5ff3a2 100644
--- a/conn.go
+++ b/conn.go
@@ -345,6 +345,8 @@ func (c *Conn) identify() (*IdentifyResponse, error) {
 		ci["output_buffer_timeout"] = int64(c.config.OutputBufferTimeout / time.Millisecond)
 	}
 	ci["msg_timeout"] = int64(c.config.MsgTimeout / time.Millisecond)
+	ci["topology_region"] = c.config.TopologyRegion
+	ci["topology_zone"] = c.config.TopologyZone
 	cmd, err := Identify(ci)
 	if err != nil {
 		return nil, ErrIdentify{err.Error()}

From 4007ceb6bd754f2c375cf417ec4b14f2181eb129 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E4=BE=AF=E5=B0=A7?= <houyao@newrank.cn>
Date: Tue, 30 Jul 2024 14:56:40 +0800
Subject: [PATCH 7/8] style: Only do code optimization

---
 api_request.go   |  4 ++--
 config.go        | 15 +++++++--------
 conn.go          |  5 ++---
 consumer.go      | 20 +++++++++-----------
 consumer_test.go |  6 +++---
 mock_test.go     |  2 +-
 producer.go      |  5 ++---
 producer_test.go |  4 ++--
 protocol.go      | 20 ++++++++++----------
 9 files changed, 38 insertions(+), 43 deletions(-)

diff --git a/api_request.go b/api_request.go
index e565e7cf..b88984df 100644
--- a/api_request.go
+++ b/api_request.go
@@ -3,7 +3,7 @@ package nsq
 import (
 	"encoding/json"
 	"fmt"
-	"io/ioutil"
+	"io"
 	"net"
 	"net/http"
 	"time"
@@ -47,7 +47,7 @@ func apiRequestNegotiateV1(httpclient *http.Client, method string, endpoint stri
 		return err
 	}
 
-	respBody, err := ioutil.ReadAll(resp.Body)
+	respBody, err := io.ReadAll(resp.Body)
 	resp.Body.Close()
 	if err != nil {
 		return err
diff --git a/config.go b/config.go
index 38fffe45..b7d877ee 100644
--- a/config.go
+++ b/config.go
@@ -5,7 +5,6 @@ import (
 	"crypto/x509"
 	"errors"
 	"fmt"
-	"io/ioutil"
 	"log"
 	"math"
 	"math/rand"
@@ -208,15 +207,15 @@ func NewConfig() *Config {
 //
 // Calls to Set() that take a time.Duration as an argument can be input as:
 //
-// 	"1000ms" (a string parsed by time.ParseDuration())
-// 	1000 (an integer interpreted as milliseconds)
-// 	1000*time.Millisecond (a literal time.Duration value)
+//	"1000ms" (a string parsed by time.ParseDuration())
+//	1000 (an integer interpreted as milliseconds)
+//	1000*time.Millisecond (a literal time.Duration value)
 //
 // Calls to Set() that take bool can be input as:
 //
-// 	"true" (a string parsed by strconv.ParseBool())
-// 	true (a boolean)
-// 	1 (an int where 1 == true and 0 == false)
+//	"true" (a string parsed by strconv.ParseBool())
+//	true (a boolean)
+//	1 (an int where 1 == true and 0 == false)
 //
 // It returns an error for an invalid option or value.
 func (c *Config) Set(option string, value interface{}) error {
@@ -434,7 +433,7 @@ func (t *tlsConfig) Set(c *Config, option string, value interface{}) error {
 			return fmt.Errorf("ERROR: %v is not a string", value)
 		}
 		tlsCertPool := x509.NewCertPool()
-		caCertFile, err := ioutil.ReadFile(filename)
+		caCertFile, err := os.ReadFile(filename)
 		if err != nil {
 			return fmt.Errorf("ERROR: failed to read custom Certificate Authority file %s", err)
 		}
diff --git a/conn.go b/conn.go
index 321da690..8ec8a4ab 100644
--- a/conn.go
+++ b/conn.go
@@ -119,8 +119,7 @@ func NewConn(addr string, config *Config, delegate ConnDelegate) *Conn {
 // The logger parameter is an interface that requires the following
 // method to be implemented (such as the the stdlib log.Logger):
 //
-//    Output(calldepth int, s string)
-//
+//	Output(calldepth int, s string)
 func (c *Conn) SetLogger(l logger, lvl LogLevel, format string) {
 	c.logGuard.Lock()
 	defer c.logGuard.Unlock()
@@ -468,7 +467,7 @@ func (c *Conn) upgradeSnappy() error {
 		conn = c.tlsConn
 	}
 	c.r = snappy.NewReader(conn)
-	c.w = snappy.NewWriter(conn)
+	c.w = snappy.NewBufferedWriter(conn)
 	frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
 	if err != nil {
 		return err
diff --git a/consumer.go b/consumer.go
index 390002a6..77d0acdc 100644
--- a/consumer.go
+++ b/consumer.go
@@ -33,9 +33,9 @@ type Handler interface {
 // HandlerFunc is a convenience type to avoid having to declare a struct
 // to implement the Handler interface, it can be used like this:
 //
-// 	consumer.AddHandler(nsq.HandlerFunc(func(m *Message) error {
-// 		// handle the message
-// 	}))
+//	consumer.AddHandler(nsq.HandlerFunc(func(m *Message) error {
+//		// handle the message
+//	}))
 type HandlerFunc func(message *Message) error
 
 // HandleMessage implements the Handler interface
@@ -220,8 +220,7 @@ func (r *Consumer) conns() []*Conn {
 // The logger parameter is an interface that requires the following
 // method to be implemented (such as the the stdlib log.Logger):
 //
-//    Output(calldepth int, s string) error
-//
+//	Output(calldepth int, s string) error
 func (r *Consumer) SetLogger(l logger, lvl LogLevel) {
 	r.logGuard.Lock()
 	defer r.logGuard.Unlock()
@@ -266,8 +265,7 @@ func (r *Consumer) getLogLevel() LogLevel {
 // of the following interfaces that modify the behavior
 // of the `Consumer`:
 //
-//    DiscoveryFilter
-//
+//	DiscoveryFilter
 func (r *Consumer) SetBehaviorDelegate(cb interface{}) {
 	matched := false
 
@@ -312,7 +310,7 @@ func (r *Consumer) getMaxInFlight() int32 {
 // ChangeMaxInFlight sets a new maximum number of messages this comsumer instance
 // will allow in-flight, and updates all existing connections as appropriate.
 //
-// For example, ChangeMaxInFlight(0) would pause message flow
+// # For example, ChangeMaxInFlight(0) would pause message flow
 //
 // If already connected, it updates the reader RDY state for each connection.
 func (r *Consumer) ChangeMaxInFlight(maxInFlight int) {
@@ -1109,7 +1107,7 @@ func (r *Consumer) stopHandlers() {
 // AddHandler sets the Handler for messages received by this Consumer. This can be called
 // multiple times to add additional handlers. Handler will have a 1:1 ratio to message handling goroutines.
 //
-// This panics if called after connecting to NSQD or NSQ Lookupd
+// # This panics if called after connecting to NSQD or NSQ Lookupd
 //
 // (see Handler or HandlerFunc for details on implementing this interface)
 func (r *Consumer) AddHandler(handler Handler) {
@@ -1120,7 +1118,7 @@ func (r *Consumer) AddHandler(handler Handler) {
 // takes a second argument which indicates the number of goroutines to spawn for
 // message handling.
 //
-// This panics if called after connecting to NSQD or NSQ Lookupd
+// # This panics if called after connecting to NSQD or NSQ Lookupd
 //
 // (see Handler or HandlerFunc for details on implementing this interface)
 func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
@@ -1228,7 +1226,7 @@ func buildLookupAddr(addr, topic string) (string, error) {
 		u.Path = "/lookup"
 	}
 
-	v, err := url.ParseQuery(u.RawQuery)
+	v, _ := url.ParseQuery(u.RawQuery)
 	v.Add("topic", topic)
 	u.RawQuery = v.Encode()
 	return u.String(), nil
diff --git a/consumer_test.go b/consumer_test.go
index dcb34df0..945f5c0c 100644
--- a/consumer_test.go
+++ b/consumer_test.go
@@ -6,7 +6,7 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
-	"io/ioutil"
+	"io"
 	"log"
 	"net"
 	"net/http"
@@ -25,7 +25,7 @@ type MyTestHandler struct {
 	messagesFailed   int
 }
 
-var nullLogger = log.New(ioutil.Discard, "", log.LstdFlags)
+var nullLogger = log.New(io.Discard, "", log.LstdFlags)
 
 func (h *MyTestHandler) LogFailedMessage(message *Message) {
 	h.messagesFailed++
@@ -58,7 +58,7 @@ func (h *MyTestHandler) HandleMessage(message *Message) error {
 func SendMessage(t *testing.T, port int, topic string, method string, body []byte) {
 	httpclient := &http.Client{}
 	endpoint := fmt.Sprintf("http://127.0.0.1:%d/%s?topic=%s", port, method, topic)
-	req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(body))
+	req, _ := http.NewRequest("POST", endpoint, bytes.NewBuffer(body))
 	resp, err := httpclient.Do(req)
 	if err != nil {
 		t.Fatalf(err.Error())
diff --git a/mock_test.go b/mock_test.go
index 057442a3..44656630 100644
--- a/mock_test.go
+++ b/mock_test.go
@@ -184,7 +184,7 @@ func framedResponse(frameType int32, data []byte) []byte {
 		return nil
 	}
 
-	_, err = w.Write(data)
+	_, _ = w.Write(data)
 	return w.Bytes()
 }
 
diff --git a/producer.go b/producer.go
index 853714f4..4019fefd 100644
--- a/producer.go
+++ b/producer.go
@@ -92,7 +92,7 @@ func NewProducer(addr string, config *Config) (*Producer, error) {
 
 	// Set default logger for all log levels
 	l := log.New(os.Stderr, "", log.Flags())
-	for index, _ := range p.logger {
+	for index := range p.logger {
 		p.logger[index] = l
 	}
 	return p, nil
@@ -120,8 +120,7 @@ func (w *Producer) Ping() error {
 // The logger parameter is an interface that requires the following
 // method to be implemented (such as the the stdlib log.Logger):
 //
-//    Output(calldepth int, s string)
-//
+//	Output(calldepth int, s string)
 func (w *Producer) SetLogger(l logger, lvl LogLevel) {
 	w.logGuard.Lock()
 	defer w.logGuard.Unlock()
diff --git a/producer_test.go b/producer_test.go
index e0cf7bf1..de895ad7 100755
--- a/producer_test.go
+++ b/producer_test.go
@@ -3,7 +3,7 @@ package nsq
 import (
 	"bytes"
 	"errors"
-	"io/ioutil"
+	"io"
 	"log"
 	"net"
 	"os"
@@ -63,7 +63,7 @@ func TestProducerConnection(t *testing.T) {
 }
 
 func TestProducerPing(t *testing.T) {
-	log.SetOutput(ioutil.Discard)
+	log.SetOutput(io.Discard)
 	defer log.SetOutput(os.Stdout)
 
 	config := NewConfig()
diff --git a/protocol.go b/protocol.go
index 356c4d25..1d0e1a9d 100644
--- a/protocol.go
+++ b/protocol.go
@@ -46,11 +46,11 @@ func isValidName(name string) bool {
 // ReadResponse is a client-side utility function to read from the supplied Reader
 // according to the NSQ protocol spec:
 //
-//    [x][x][x][x][x][x][x][x]...
-//    |  (int32) || (binary)
-//    |  4-byte  || N-byte
-//    ------------------------...
-//        size       data
+//	[x][x][x][x][x][x][x][x]...
+//	|  (int32) || (binary)
+//	|  4-byte  || N-byte
+//	------------------------...
+//	    size       data
 func ReadResponse(r io.Reader, maxMsgSize int32) ([]byte, error) {
 	var msgSize int32
 
@@ -84,11 +84,11 @@ func ReadResponse(r io.Reader, maxMsgSize int32) ([]byte, error) {
 // UnpackResponse is a client-side utility function that unpacks serialized data
 // according to NSQ protocol spec:
 //
-//    [x][x][x][x][x][x][x][x]...
-//    |  (int32) || (binary)
-//    |  4-byte  || N-byte
-//    ------------------------...
-//      frame ID     data
+//	[x][x][x][x][x][x][x][x]...
+//	|  (int32) || (binary)
+//	|  4-byte  || N-byte
+//	------------------------...
+//	  frame ID     data
 //
 // Returns a triplicate of: frame type, data ([]byte), error
 func UnpackResponse(response []byte) (int32, []byte, error) {

From d54534195e974483777d45774856cf315af7d2f7 Mon Sep 17 00:00:00 2001
From: Jehiah Czebotar <jehiah@gmail.com>
Date: Thu, 13 Mar 2025 09:40:46 -0400
Subject: [PATCH 8/8] CI updates

---
 .github/workflows/test.yml | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 6a0062bd..070dae1b 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -6,26 +6,26 @@ on:
 
 jobs:
   test:
-    runs-on: ubuntu-20.04
+    runs-on: ubuntu-latest
     timeout-minutes: 30
     strategy:
       fail-fast: false
       matrix:
         imgtag:
-          - "golang:1.15-buster"
-          - "golang:1.16-buster"
-          - "golang:1.17-buster"
+          - "golang:1.23-bullseye"
+          - "golang:1.24-bullseye"
         goarch:
           - "amd64"
         nsq_ver:
           - "nsq-1.1.0.linux-amd64.go1.10.3"
           - "nsq-1.2.0.linux-amd64.go1.12.9"
           - "nsq-1.2.1.linux-amd64.go1.16.6"
+          - "nsq-1.3.0.linux-amd64.go1.21.5"
         include:
           # test 386 only against latest version of NSQ
-          - imgtag: "golang:1.17-buster"
+          - imgtag: "golang:1.24-bullseye"
             goarch: "386"
-            nsq_ver: "nsq-1.2.1.linux-amd64.go1.16.6"
+            nsq_ver: "nsq-1.3.0.linux-amd64.go1.21.5"
           
     container: "${{matrix.imgtag}}"
     env: