Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: mreiferson/go-nsq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: master
Choose a base ref
...
head repository: nsqio/go-nsq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: master
Choose a head ref
Checking mergeability… Don’t worry, you can still create the pull request.
  • 16 commits
  • 12 files changed
  • 7 contributors

Commits on Oct 25, 2021

  1. consumer: lower old RDYs first, then assign to new connection

    Currently when a consumer is connected to a new NSQD, the connection
    is added to the pool of connections and then a loop iterates over all
    of them, rebalancing the RDY values. The iteration order of a map is
    random, thus it can happen that the consumer tries to assign the RDY
    value to the new connection, before decreasing the existing ones. This
    leads to a RDY of 0 being assigned.
    
    This issue is even more pronounced when there are only two connections:
    initially all the RDY are assigned the existing connection, and when a
    new NSQD is connected, the old one needs to be cut in half and the half
    just taken away gets assigned to the new connection. If map iteration
    happens in reverse order - starting with the new connection - it will
    get assigned 0 RDY and then the old connection gets cut in half. The
    end result is blocked communication on the new NSQD instance until a new
    round of rebalance is triggered.
    
    This issue may be less relevant in long running processes, but it is
    very annoying in tests where we're adding and remocing NSQD instances
    and the test hangs from time to time due to a flaky RDY allocation.
    
    The issue is fixed by fist iterating over all the existing connections
    and rebalancing them, and only at the very end calling maybeUpdateRDY on
    the new NSQD instance.
    karalabe committed Oct 25, 2021

    Verified

    This commit was signed with the committer’s verified signature.
    karalabe Péter Szilágyi
    Copy the full SHA
    5f35ce8 View commit details

Commits on Nov 28, 2021

  1. Merge pull request nsqio#341 from karalabe/fix-rdy-distribution

    consumer: lower old RDYs first, then assign to new connection
    mreiferson authored Nov 28, 2021

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    827b836 View commit details

Commits on Aug 1, 2022

  1. producer: handle case of no transactions in popTransaction()

    This corner case generally should not happen, but a nsqd bug or
    strange network condition could possibly send erroneous bytes that
    are interpreted as a response. The Producer should be robust,
    and not panic the whole process.
    ploxiln committed Aug 1, 2022
    Copy the full SHA
    0056705 View commit details
  2. Merge pull request nsqio#346 from ploxiln/pop_no_transaction

    producer: handle case of no transactions in popTransaction()
    ploxiln authored Aug 1, 2022

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    0e8d7a7 View commit details

Commits on Mar 6, 2023

  1. Limit read message size

    - Add MaxMsgSize to configuration mimicking the nsqd cofiguration key.
      It defaults to 1048576 as the nsqd config -- source:
      https://github.com/nsqio/nsq/blob/a4939964f6715edd27a6904b87c2f9eb6a45e749/nsqd/options.go#L130
    
    - Pass MaxMsgSize to ReadResponse via its caller, ReadUnpackedResponse.
      Check msgSize does not exceed the maximum in ReadResponse. This
      reduces the risk of attempting to read arbitrary (non-nsq) responses.
    
    - Generate custom error if msgSize is the result of deserializing the
      first 4 bytes of an HTTP response (1213486160) to facilitate troublehooting.
    
    Default maxMsgSize to 0 for no limit
    
    Add unexpected HTTP response test
    
    Fix indentation
    ibice committed Mar 6, 2023
    Copy the full SHA
    dc8315d View commit details

Commits on May 30, 2023

  1. Merge pull request nsqio#353 from ibice/limit-read-message-size

    Limit read message size
    mreiferson authored May 30, 2023

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    c647fa6 View commit details

Commits on Jul 14, 2023

  1. Copy the full SHA
    d9c722e View commit details

Commits on Jul 16, 2023

  1. Merge pull request nsqio#356 from testwill/time

    chore: use time.Since instead of time.Now().Sub
    mreiferson authored Jul 16, 2023

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    14c9d1d View commit details

Commits on Sep 8, 2023

  1. Copy the full SHA
    9b091e0 View commit details

Commits on Sep 18, 2023

  1. Merge pull request nsqio#358 from jehiah/dependencies_358

    go.mod: upgrade snappy => v0.0.4; upgrade go.mod version 1.17+
    mreiferson authored Sep 18, 2023

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    c2c3842 View commit details

Commits on May 24, 2024

  1. Copy the full SHA
    9982462 View commit details

Commits on Jul 30, 2024

  1. style: Only do code optimization

    侯尧 committed Jul 30, 2024
    Copy the full SHA
    4007ceb View commit details

Commits on Sep 1, 2024

  1. Merge pull request nsqio#369 from qaqhy/optimize_code

    style: Only do code optimization
    mreiferson authored Sep 1, 2024

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    8098697 View commit details

Commits on Jan 27, 2025

  1. Merge pull request nsqio#312 from jehiah/topology_hints_312

    Set topology region+zone and send in IDENTIFY msg
    jehiah authored Jan 27, 2025

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    796abbc View commit details

Commits on Mar 13, 2025

  1. CI updates

    jehiah committed Mar 13, 2025
    Copy the full SHA
    d545341 View commit details
  2. Merge pull request nsqio#376 from jehiah/dep_updates_376

    CI updates
    jehiah authored Mar 13, 2025

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    326de60 View commit details
Showing with 122 additions and 68 deletions.
  1. +6 −6 .github/workflows/test.yml
  2. +2 −2 api_request.go
  3. +14 −8 config.go
  4. +12 −11 conn.go
  5. +18 −15 consumer.go
  6. +3 −3 consumer_test.go
  7. +2 −2 go.mod
  8. +2 −2 go.sum
  9. +1 −1 mock_test.go
  10. +13 −3 producer.go
  11. +25 −2 producer_test.go
  12. +24 −13 protocol.go
12 changes: 6 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -6,26 +6,26 @@ on:

jobs:
test:
runs-on: ubuntu-20.04
runs-on: ubuntu-latest
timeout-minutes: 30
strategy:
fail-fast: false
matrix:
imgtag:
- "golang:1.15-buster"
- "golang:1.16-buster"
- "golang:1.17-buster"
- "golang:1.23-bullseye"
- "golang:1.24-bullseye"
goarch:
- "amd64"
nsq_ver:
- "nsq-1.1.0.linux-amd64.go1.10.3"
- "nsq-1.2.0.linux-amd64.go1.12.9"
- "nsq-1.2.1.linux-amd64.go1.16.6"
- "nsq-1.3.0.linux-amd64.go1.21.5"
include:
# test 386 only against latest version of NSQ
- imgtag: "golang:1.17-buster"
- imgtag: "golang:1.24-bullseye"
goarch: "386"
nsq_ver: "nsq-1.2.1.linux-amd64.go1.16.6"
nsq_ver: "nsq-1.3.0.linux-amd64.go1.21.5"

container: "${{matrix.imgtag}}"
env:
4 changes: 2 additions & 2 deletions api_request.go
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ package nsq
import (
"encoding/json"
"fmt"
"io/ioutil"
"io"
"net"
"net/http"
"time"
@@ -47,7 +47,7 @@ func apiRequestNegotiateV1(httpclient *http.Client, method string, endpoint stri
return err
}

respBody, err := ioutil.ReadAll(resp.Body)
respBody, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return err
22 changes: 14 additions & 8 deletions config.go
Original file line number Diff line number Diff line change
@@ -5,7 +5,6 @@ import (
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"log"
"math"
"math/rand"
@@ -140,6 +139,10 @@ type Config struct {
Hostname string `opt:"hostname"`
UserAgent string `opt:"user_agent"`

// Topology hints allow nsqd to prefer same zone and same region consumers
TopologyRegion string `opt:"topology_region"`
TopologyZone string `opt:"topology_zone"`

// Duration of time between heartbeats. This must be less than ReadTimeout
HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"`
// Integer percentage to sample the channel (requires nsqd 0.2.25+)
@@ -177,6 +180,9 @@ type Config struct {
// The server-side message timeout for messages delivered to this client
MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`

// Maximum size of a single message in bytes (0 means no limit)
MaxMsgSize int32 `opt:"max_msg_size" min:"0" default:"0"`

// Secret for nsqd authentication (requires nsqd 0.2.29+)
AuthSecret string `opt:"auth_secret"`
// Use AuthSecret as 'Authorization: Bearer {AuthSecret}' on lookupd queries
@@ -205,15 +211,15 @@ func NewConfig() *Config {
//
// Calls to Set() that take a time.Duration as an argument can be input as:
//
// "1000ms" (a string parsed by time.ParseDuration())
// 1000 (an integer interpreted as milliseconds)
// 1000*time.Millisecond (a literal time.Duration value)
// "1000ms" (a string parsed by time.ParseDuration())
// 1000 (an integer interpreted as milliseconds)
// 1000*time.Millisecond (a literal time.Duration value)
//
// Calls to Set() that take bool can be input as:
//
// "true" (a string parsed by strconv.ParseBool())
// true (a boolean)
// 1 (an int where 1 == true and 0 == false)
// "true" (a string parsed by strconv.ParseBool())
// true (a boolean)
// 1 (an int where 1 == true and 0 == false)
//
// It returns an error for an invalid option or value.
func (c *Config) Set(option string, value interface{}) error {
@@ -431,7 +437,7 @@ func (t *tlsConfig) Set(c *Config, option string, value interface{}) error {
return fmt.Errorf("ERROR: %v is not a string", value)
}
tlsCertPool := x509.NewCertPool()
caCertFile, err := ioutil.ReadFile(filename)
caCertFile, err := os.ReadFile(filename)
if err != nil {
return fmt.Errorf("ERROR: failed to read custom Certificate Authority file %s", err)
}
23 changes: 12 additions & 11 deletions conn.go
Original file line number Diff line number Diff line change
@@ -119,8 +119,7 @@ func NewConn(addr string, config *Config, delegate ConnDelegate) *Conn {
// The logger parameter is an interface that requires the following
// method to be implemented (such as the the stdlib log.Logger):
//
// Output(calldepth int, s string)
//
// Output(calldepth int, s string)
func (c *Conn) SetLogger(l logger, lvl LogLevel, format string) {
c.logGuard.Lock()
defer c.logGuard.Unlock()
@@ -345,6 +344,8 @@ func (c *Conn) identify() (*IdentifyResponse, error) {
ci["output_buffer_timeout"] = int64(c.config.OutputBufferTimeout / time.Millisecond)
}
ci["msg_timeout"] = int64(c.config.MsgTimeout / time.Millisecond)
ci["topology_region"] = c.config.TopologyRegion
ci["topology_zone"] = c.config.TopologyZone
cmd, err := Identify(ci)
if err != nil {
return nil, ErrIdentify{err.Error()}
@@ -355,7 +356,7 @@ func (c *Conn) identify() (*IdentifyResponse, error) {
return nil, ErrIdentify{err.Error()}
}

frameType, data, err := ReadUnpackedResponse(c)
frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
if err != nil {
return nil, ErrIdentify{err.Error()}
}
@@ -434,7 +435,7 @@ func (c *Conn) upgradeTLS(tlsConf *tls.Config) error {
}
c.r = c.tlsConn
c.w = c.tlsConn
frameType, data, err := ReadUnpackedResponse(c)
frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
if err != nil {
return err
}
@@ -452,7 +453,7 @@ func (c *Conn) upgradeDeflate(level int) error {
fw, _ := flate.NewWriter(conn, level)
c.r = flate.NewReader(conn)
c.w = fw
frameType, data, err := ReadUnpackedResponse(c)
frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
if err != nil {
return err
}
@@ -468,8 +469,8 @@ func (c *Conn) upgradeSnappy() error {
conn = c.tlsConn
}
c.r = snappy.NewReader(conn)
c.w = snappy.NewWriter(conn)
frameType, data, err := ReadUnpackedResponse(c)
c.w = snappy.NewBufferedWriter(conn)
frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
if err != nil {
return err
}
@@ -490,7 +491,7 @@ func (c *Conn) auth(secret string) error {
return err
}

frameType, data, err := ReadUnpackedResponse(c)
frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
if err != nil {
return err
}
@@ -518,7 +519,7 @@ func (c *Conn) readLoop() {
goto exit
}

frameType, data, err := ReadUnpackedResponse(c)
frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
if err != nil {
if err == io.EOF && atomic.LoadInt32(&c.closeFlag) == 1 {
goto exit
@@ -692,7 +693,7 @@ func (c *Conn) cleanup() {
msgsInFlight = atomic.LoadInt64(&c.messagesInFlight)
}
if msgsInFlight > 0 {
if time.Now().Sub(lastWarning) > time.Second {
if time.Since(lastWarning) > time.Second {
c.log(LogLevelWarning, "draining... waiting for %d messages in flight", msgsInFlight)
lastWarning = time.Now()
}
@@ -701,7 +702,7 @@ func (c *Conn) cleanup() {
// until the readLoop has exited we cannot be sure that there
// still won't be a race
if atomic.LoadInt32(&c.readLoopRunning) == 1 {
if time.Now().Sub(lastWarning) > time.Second {
if time.Since(lastWarning) > time.Second {
c.log(LogLevelWarning, "draining... readLoop still running")
lastWarning = time.Now()
}
33 changes: 18 additions & 15 deletions consumer.go
Original file line number Diff line number Diff line change
@@ -33,9 +33,9 @@ type Handler interface {
// HandlerFunc is a convenience type to avoid having to declare a struct
// to implement the Handler interface, it can be used like this:
//
// consumer.AddHandler(nsq.HandlerFunc(func(m *Message) error {
// // handle the message
// }))
// consumer.AddHandler(nsq.HandlerFunc(func(m *Message) error {
// // handle the message
// }))
type HandlerFunc func(message *Message) error

// HandleMessage implements the Handler interface
@@ -220,8 +220,7 @@ func (r *Consumer) conns() []*Conn {
// The logger parameter is an interface that requires the following
// method to be implemented (such as the the stdlib log.Logger):
//
// Output(calldepth int, s string) error
//
// Output(calldepth int, s string) error
func (r *Consumer) SetLogger(l logger, lvl LogLevel) {
r.logGuard.Lock()
defer r.logGuard.Unlock()
@@ -266,8 +265,7 @@ func (r *Consumer) getLogLevel() LogLevel {
// of the following interfaces that modify the behavior
// of the `Consumer`:
//
// DiscoveryFilter
//
// DiscoveryFilter
func (r *Consumer) SetBehaviorDelegate(cb interface{}) {
matched := false

@@ -312,7 +310,7 @@ func (r *Consumer) getMaxInFlight() int32 {
// ChangeMaxInFlight sets a new maximum number of messages this comsumer instance
// will allow in-flight, and updates all existing connections as appropriate.
//
// For example, ChangeMaxInFlight(0) would pause message flow
// # For example, ChangeMaxInFlight(0) would pause message flow
//
// If already connected, it updates the reader RDY state for each connection.
func (r *Consumer) ChangeMaxInFlight(maxInFlight int) {
@@ -609,8 +607,11 @@ func (r *Consumer) ConnectToNSQD(addr string) error {

// pre-emptive signal to existing connections to lower their RDY count
for _, c := range r.conns() {
r.maybeUpdateRDY(c)
if c != conn {
r.maybeUpdateRDY(c)
}
}
r.maybeUpdateRDY(conn)

return nil
}
@@ -912,7 +913,9 @@ func (r *Consumer) maybeUpdateRDY(conn *Conn) {

count := r.perConnMaxInFlight()
r.log(LogLevelDebug, "(%s) sending RDY %d", conn, count)
r.updateRDY(conn, count)
if err := r.updateRDY(conn, count); err != nil {
r.log(LogLevelWarning, "(%s) error sending RDY %d: %v", conn, count, err)
}
}

func (r *Consumer) rdyLoop() {
@@ -1028,8 +1031,8 @@ func (r *Consumer) redistributeRDY() {

possibleConns := make([]*Conn, 0, len(conns))
for _, c := range conns {
lastMsgDuration := time.Now().Sub(c.LastMessageTime())
lastRdyDuration := time.Now().Sub(c.LastRdyTime())
lastMsgDuration := time.Since(c.LastMessageTime())
lastRdyDuration := time.Since(c.LastRdyTime())
rdyCount := c.RDY()
r.log(LogLevelDebug, "(%s) rdy: %d (last message received %s)",
c.String(), rdyCount, lastMsgDuration)
@@ -1104,7 +1107,7 @@ func (r *Consumer) stopHandlers() {
// AddHandler sets the Handler for messages received by this Consumer. This can be called
// multiple times to add additional handlers. Handler will have a 1:1 ratio to message handling goroutines.
//
// This panics if called after connecting to NSQD or NSQ Lookupd
// # This panics if called after connecting to NSQD or NSQ Lookupd
//
// (see Handler or HandlerFunc for details on implementing this interface)
func (r *Consumer) AddHandler(handler Handler) {
@@ -1115,7 +1118,7 @@ func (r *Consumer) AddHandler(handler Handler) {
// takes a second argument which indicates the number of goroutines to spawn for
// message handling.
//
// This panics if called after connecting to NSQD or NSQ Lookupd
// # This panics if called after connecting to NSQD or NSQ Lookupd
//
// (see Handler or HandlerFunc for details on implementing this interface)
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
@@ -1223,7 +1226,7 @@ func buildLookupAddr(addr, topic string) (string, error) {
u.Path = "/lookup"
}

v, err := url.ParseQuery(u.RawQuery)
v, _ := url.ParseQuery(u.RawQuery)
v.Add("topic", topic)
u.RawQuery = v.Encode()
return u.String(), nil
6 changes: 3 additions & 3 deletions consumer_test.go
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"io"
"log"
"net"
"net/http"
@@ -25,7 +25,7 @@ type MyTestHandler struct {
messagesFailed int
}

var nullLogger = log.New(ioutil.Discard, "", log.LstdFlags)
var nullLogger = log.New(io.Discard, "", log.LstdFlags)

func (h *MyTestHandler) LogFailedMessage(message *Message) {
h.messagesFailed++
@@ -58,7 +58,7 @@ func (h *MyTestHandler) HandleMessage(message *Message) error {
func SendMessage(t *testing.T, port int, topic string, method string, body []byte) {
httpclient := &http.Client{}
endpoint := fmt.Sprintf("http://127.0.0.1:%d/%s?topic=%s", port, method, topic)
req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(body))
req, _ := http.NewRequest("POST", endpoint, bytes.NewBuffer(body))
resp, err := httpclient.Do(req)
if err != nil {
t.Fatalf(err.Error())
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module github.com/nsqio/go-nsq

go 1.11
go 1.17

require github.com/golang/snappy v0.0.1
require github.com/golang/snappy v0.0.4
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
2 changes: 1 addition & 1 deletion mock_test.go
Original file line number Diff line number Diff line change
@@ -184,7 +184,7 @@ func framedResponse(frameType int32, data []byte) []byte {
return nil
}

_, err = w.Write(data)
_, _ = w.Write(data)
return w.Bytes()
}

16 changes: 13 additions & 3 deletions producer.go
Original file line number Diff line number Diff line change
@@ -92,7 +92,7 @@ func NewProducer(addr string, config *Config) (*Producer, error) {

// Set default logger for all log levels
l := log.New(os.Stderr, "", log.Flags())
for index, _ := range p.logger {
for index := range p.logger {
p.logger[index] = l
}
return p, nil
@@ -120,8 +120,7 @@ func (w *Producer) Ping() error {
// The logger parameter is an interface that requires the following
// method to be implemented (such as the the stdlib log.Logger):
//
// Output(calldepth int, s string)
//
// Output(calldepth int, s string)
func (w *Producer) SetLogger(l logger, lvl LogLevel) {
w.logGuard.Lock()
defer w.logGuard.Unlock()
@@ -367,6 +366,17 @@ exit:
}

func (w *Producer) popTransaction(frameType int32, data []byte) {
if len(w.transactions) == 0 {
dataLen := len(data)
if dataLen > 32 {
data = data[:32]
}
w.log(LogLevelError,
"(%s) unexpected response type=%d len=%d data[:32]=0x%x",
w.conn.String(), frameType, dataLen, data)
w.close()
return
}
t := w.transactions[0]
w.transactions = w.transactions[1:]
if frameType == FrameTypeError {
Loading