Skip to content

Commit

Permalink
nsqd/nsqlookupd: exit when tcp server accept error
Browse files Browse the repository at this point in the history
  • Loading branch information
mdh67899 committed Feb 27, 2019
1 parent fd1cde1 commit 1ee8ecc
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 5 deletions.
24 changes: 23 additions & 1 deletion apps/nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -183,10 +184,24 @@ func (cfg config) Validate() {

type program struct {
nsqd *nsqd.NSQD
lock *sync.RWMutex
}

func main() {
prg := &program{}
prg := &program{
nsqd: nil,
lock: new(sync.RWMutex),
}
go func() {
for {
if prg.nsqd != nil {
<-prg.nsqd.GetfatalErrChan()
prg.Stop()
os.Exit(1)
}
time.Sleep(time.Second * 3)
}
}()
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
log.Fatal(err)
}
Expand All @@ -201,6 +216,9 @@ func (p *program) Init(env svc.Environment) error {
}

func (p *program) Start() error {
p.lock.Lock()
defer p.lock.Unlock()

opts := nsqd.NewOptions()

flagSet := nsqdFlagSet(opts)
Expand Down Expand Up @@ -241,8 +259,12 @@ func (p *program) Start() error {
}

func (p *program) Stop() error {
p.lock.Lock()
defer p.lock.Unlock()

if p.nsqd != nil {
p.nsqd.Exit()
p.nsqd = nil
}
return nil
}
25 changes: 24 additions & 1 deletion apps/nsqlookupd/nsqlookupd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"log"
"os"
"path/filepath"
"sync"
"syscall"
"time"

"github.com/BurntSushi/toml"
"github.com/judwhite/go-svc/svc"
Expand Down Expand Up @@ -37,10 +39,24 @@ func nsqlookupdFlagSet(opts *nsqlookupd.Options) *flag.FlagSet {

type program struct {
nsqlookupd *nsqlookupd.NSQLookupd
lock *sync.RWMutex
}

func main() {
prg := &program{}
prg := &program{
nsqlookupd: nil,
lock: new(sync.RWMutex),
}
go func() {
for {
if prg.nsqlookupd != nil {
<-prg.nsqlookupd.GetfatalErrChan()
prg.Stop()
os.Exit(1)
}
time.Sleep(time.Second * 3)
}
}()
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
log.Fatal(err)
}
Expand All @@ -55,6 +71,9 @@ func (p *program) Init(env svc.Environment) error {
}

func (p *program) Start() error {
p.lock.Lock()
defer p.lock.Unlock()

opts := nsqlookupd.NewOptions()

flagSet := nsqlookupdFlagSet(opts)
Expand Down Expand Up @@ -87,8 +106,12 @@ func (p *program) Start() error {
}

func (p *program) Stop() error {
p.lock.Lock()
defer p.lock.Unlock()

if p.nsqlookupd != nil {
p.nsqlookupd.Exit()
p.nsqlookupd = nil
}
return nil
}
2 changes: 1 addition & 1 deletion internal/protocol/tcp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
}
// theres no direct way to detect this error because it is not exposed
if !strings.Contains(err.Error(), "use of closed network connection") {
logf(lg.ERROR, "listener.Accept() - %s", err)
logf(lg.FATAL, "listener.Accept() - %s", err)
}
break
}
Expand Down
18 changes: 18 additions & 0 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type NSQD struct {
notifyChan chan interface{}
optsNotificationChan chan struct{}
exitChan chan int
fatalErrChan chan struct{}
fatalErrOnce *sync.Once
waitGroup util.WaitGroupWrapper

ci *clusterinfo.ClusterInfo
Expand All @@ -92,6 +94,8 @@ func New(opts *Options) *NSQD {
topicMap: make(map[string]*Topic),
clients: make(map[int64]Client),
exitChan: make(chan int),
fatalErrChan: make(chan struct{}),
fatalErrOnce: new(sync.Once),
notifyChan: make(chan interface{}),
optsNotificationChan: make(chan struct{}, 1),
dl: dirlock.New(dataPath),
Expand Down Expand Up @@ -264,15 +268,18 @@ func (n *NSQD) Main() {
tcpServer := &tcpServer{ctx: ctx}
n.waitGroup.Wrap(func() {
protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
n.fatalErrorNotify()
})
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)
n.fatalErrorNotify()
})
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
httpsServer := newHTTPServer(ctx, true, true)
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)
n.fatalErrorNotify()
})
}

Expand Down Expand Up @@ -453,6 +460,17 @@ func (n *NSQD) Exit() {
n.logf(LOG_INFO, "NSQ: bye")
}

func (n *NSQD) fatalErrorNotify() {
n.fatalErrOnce.Do(
func() {
close(n.fatalErrChan)
})
}

func (n *NSQD) GetfatalErrChan() chan struct{} {
return n.fatalErrChan
}

// GetTopic performs a thread safe operation
// to return a pointer to a Topic object (potentially new)
func (n *NSQD) GetTopic(topicName string) *Topic {
Expand Down
21 changes: 19 additions & 2 deletions nsqlookupd/nsqlookupd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type NSQLookupd struct {
opts *Options
tcpListener net.Listener
httpListener net.Listener
fatalErrChan chan struct{}
fatalErrOnce *sync.Once
waitGroup util.WaitGroupWrapper
DB *RegistrationDB
}
Expand All @@ -28,8 +30,10 @@ func New(opts *Options) *NSQLookupd {
opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
}
n := &NSQLookupd{
opts: opts,
DB: NewRegistrationDB(),
opts: opts,
DB: NewRegistrationDB(),
fatalErrChan: make(chan struct{}),
fatalErrOnce: new(sync.Once),
}

var err error
Expand Down Expand Up @@ -63,10 +67,12 @@ func (l *NSQLookupd) Main() error {
tcpServer := &tcpServer{ctx: ctx}
l.waitGroup.Wrap(func() {
protocol.TCPServer(tcpListener, tcpServer, l.logf)
l.fatalErrorNotify()
})
httpServer := newHTTPServer(ctx)
l.waitGroup.Wrap(func() {
http_api.Serve(httpListener, httpServer, "HTTP", l.logf)
l.fatalErrorNotify()
})

return nil
Expand All @@ -90,3 +96,14 @@ func (l *NSQLookupd) Exit() {
}
l.waitGroup.Wait()
}

func (l *NSQLookupd) fatalErrorNotify() {
l.fatalErrOnce.Do(
func() {
close(l.fatalErrChan)
})
}

func (l *NSQLookupd) GetfatalErrChan() chan struct{} {
return l.fatalErrChan
}

0 comments on commit 1ee8ecc

Please sign in to comment.