Skip to content

Commit

Permalink
nsqd: improve test coverage
Browse files Browse the repository at this point in the history
Adds test cases to improve code coverage to protect against potential
compatibility conflicts moving forward.

Existing test cases refactored to leverage common test helpers within
the internal/test package to avoid code duplication and simplify
the testing effort.
  • Loading branch information
kenjones-cisco committed Aug 9, 2016
1 parent f436cd7 commit 5b684bc
Show file tree
Hide file tree
Showing 13 changed files with 1,037 additions and 838 deletions.
80 changes: 41 additions & 39 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"strconv"
"testing"
"time"

"github.com/nsqio/nsq/internal/test"
)

// ensure that we can push a message through a topic and get it out of a channel
func TestPutMessage(t *testing.T) {
opts := NewOptions()
opts.Logger = newTestLogger(t)
opts.Logger = test.NewTestLogger(t)
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()
Expand All @@ -27,14 +29,14 @@ func TestPutMessage(t *testing.T) {
topic.PutMessage(msg)

outputMsg := <-channel1.memoryMsgChan
equal(t, msg.ID, outputMsg.ID)
equal(t, msg.Body, outputMsg.Body)
test.Equal(t, msg.ID, outputMsg.ID)
test.Equal(t, msg.Body, outputMsg.Body)
}

// ensure that both channels get the same message
func TestPutMessage2Chan(t *testing.T) {
opts := NewOptions()
opts.Logger = newTestLogger(t)
opts.Logger = test.NewTestLogger(t)
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()
Expand All @@ -49,17 +51,17 @@ func TestPutMessage2Chan(t *testing.T) {
topic.PutMessage(msg)

outputMsg1 := <-channel1.memoryMsgChan
equal(t, msg.ID, outputMsg1.ID)
equal(t, msg.Body, outputMsg1.Body)
test.Equal(t, msg.ID, outputMsg1.ID)
test.Equal(t, msg.Body, outputMsg1.Body)

outputMsg2 := <-channel2.memoryMsgChan
equal(t, msg.ID, outputMsg2.ID)
equal(t, msg.Body, outputMsg2.Body)
test.Equal(t, msg.ID, outputMsg2.ID)
test.Equal(t, msg.Body, outputMsg2.Body)
}

func TestChannelBackendMaxMsgSize(t *testing.T) {
opts := NewOptions()
opts.Logger = newTestLogger(t)
opts.Logger = test.NewTestLogger(t)
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()
Expand All @@ -68,14 +70,14 @@ func TestChannelBackendMaxMsgSize(t *testing.T) {
topic := nsqd.GetTopic(topicName)
ch := topic.GetChannel("ch")

equal(t, ch.backend.(*diskQueue).maxMsgSize, int32(opts.MaxMsgSize+minValidMsgLength))
test.Equal(t, int32(opts.MaxMsgSize+minValidMsgLength), ch.backend.(*diskQueue).maxMsgSize)
}

func TestInFlightWorker(t *testing.T) {
count := 250

opts := NewOptions()
opts.Logger = newTestLogger(t)
opts.Logger = test.NewTestLogger(t)
opts.MsgTimeout = 100 * time.Millisecond
opts.QueueScanRefreshInterval = 100 * time.Millisecond
_, _, nsqd := mustStartNSQD(opts)
Expand All @@ -94,12 +96,12 @@ func TestInFlightWorker(t *testing.T) {
channel.Lock()
inFlightMsgs := len(channel.inFlightMessages)
channel.Unlock()
equal(t, inFlightMsgs, count)
test.Equal(t, count, inFlightMsgs)

channel.inFlightMutex.Lock()
inFlightPQMsgs := len(channel.inFlightPQ)
channel.inFlightMutex.Unlock()
equal(t, inFlightPQMsgs, count)
test.Equal(t, count, inFlightPQMsgs)

// the in flight worker has a resolution of 100ms so we need to wait
// at least that much longer than our msgTimeout (in worst case)
Expand All @@ -108,17 +110,17 @@ func TestInFlightWorker(t *testing.T) {
channel.Lock()
inFlightMsgs = len(channel.inFlightMessages)
channel.Unlock()
equal(t, inFlightMsgs, 0)
test.Equal(t, 0, inFlightMsgs)

channel.inFlightMutex.Lock()
inFlightPQMsgs = len(channel.inFlightPQ)
channel.inFlightMutex.Unlock()
equal(t, inFlightPQMsgs, 0)
test.Equal(t, 0, inFlightPQMsgs)
}

func TestChannelEmpty(t *testing.T) {
opts := NewOptions()
opts.Logger = newTestLogger(t)
opts.Logger = test.NewTestLogger(t)
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()
Expand All @@ -135,23 +137,23 @@ func TestChannelEmpty(t *testing.T) {
}

channel.RequeueMessage(0, msgs[len(msgs)-1].ID, 100*time.Millisecond)
equal(t, len(channel.inFlightMessages), 24)
equal(t, len(channel.inFlightPQ), 24)
equal(t, len(channel.deferredMessages), 1)
equal(t, len(channel.deferredPQ), 1)
test.Equal(t, 24, len(channel.inFlightMessages))
test.Equal(t, 24, len(channel.inFlightPQ))
test.Equal(t, 1, len(channel.deferredMessages))
test.Equal(t, 1, len(channel.deferredPQ))

channel.Empty()

equal(t, len(channel.inFlightMessages), 0)
equal(t, len(channel.inFlightPQ), 0)
equal(t, len(channel.deferredMessages), 0)
equal(t, len(channel.deferredPQ), 0)
equal(t, channel.Depth(), int64(0))
test.Equal(t, 0, len(channel.inFlightMessages))
test.Equal(t, 0, len(channel.inFlightPQ))
test.Equal(t, 0, len(channel.deferredMessages))
test.Equal(t, 0, len(channel.deferredPQ))
test.Equal(t, int64(0), channel.Depth())
}

func TestChannelEmptyConsumer(t *testing.T) {
opts := NewOptions()
opts.Logger = newTestLogger(t)
opts.Logger = test.NewTestLogger(t)
tcpAddr, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()
Expand All @@ -174,20 +176,20 @@ func TestChannelEmptyConsumer(t *testing.T) {

for _, cl := range channel.clients {
stats := cl.Stats()
equal(t, stats.InFlightCount, int64(25))
test.Equal(t, int64(25), stats.InFlightCount)
}

channel.Empty()

for _, cl := range channel.clients {
stats := cl.Stats()
equal(t, stats.InFlightCount, int64(0))
test.Equal(t, int64(0), stats.InFlightCount)
}
}

func TestChannelHealth(t *testing.T) {
opts := NewOptions()
opts.Logger = newTestLogger(t)
opts.Logger = test.NewTestLogger(t)
opts.MemQueueSize = 2

_, httpAddr, nsqd := mustStartNSQD(opts)
Expand All @@ -202,34 +204,34 @@ func TestChannelHealth(t *testing.T) {

msg := NewMessage(<-nsqd.idChan, make([]byte, 100))
err := channel.PutMessage(msg)
equal(t, err, nil)
test.Nil(t, err)

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
err = channel.PutMessage(msg)
equal(t, err, nil)
test.Nil(t, err)

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
err = channel.PutMessage(msg)
nequal(t, err, nil)
test.NotNil(t, err)

url := fmt.Sprintf("http://%s/ping", httpAddr)
resp, err := http.Get(url)
equal(t, err, nil)
equal(t, resp.StatusCode, 500)
test.Nil(t, err)
test.Equal(t, 500, resp.StatusCode)
body, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()
equal(t, string(body), "NOK - never gonna happen")
test.Equal(t, "NOK - never gonna happen", string(body))

channel.backend = &errorRecoveredBackendQueue{}

msg = NewMessage(<-nsqd.idChan, make([]byte, 100))
err = channel.PutMessage(msg)
equal(t, err, nil)
test.Nil(t, err)

resp, err = http.Get(url)
equal(t, err, nil)
equal(t, resp.StatusCode, 200)
test.Nil(t, err)
test.Equal(t, 200, resp.StatusCode)
body, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close()
equal(t, string(body), "OK")
test.Equal(t, "OK", string(body))
}
4 changes: 2 additions & 2 deletions nsqd/diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ type diskQueue struct {
exitChan chan int
exitSyncChan chan int

logger logger
logger Logger
}

// newDiskQueue instantiates a new instance of diskQueue, retrieving metadata
// from the filesystem and starting the read ahead goroutine
func newDiskQueue(name string, dataPath string, maxBytesPerFile int64,
minMsgSize int32, maxMsgSize int32,
syncEvery int64, syncTimeout time.Duration,
logger logger) BackendQueue {
logger Logger) BackendQueue {
d := diskQueue{
name: name,
dataPath: dataPath,
Expand Down
Loading

0 comments on commit 5b684bc

Please sign in to comment.