Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd/nsqlookupd: exit when tcp server accept error #1138

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions internal/protocol/tcp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ type TCPHandler interface {
Handle(net.Conn)
}

func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
logf(lg.INFO, "TCP: listening on %s", listener.Addr())

var fatalErr error
for {
clientConn, err := listener.Accept()
if err != nil {
Expand All @@ -25,12 +26,14 @@ func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
}
// theres no direct way to detect this error because it is not exposed
if !strings.Contains(err.Error(), "use of closed network connection") {
logf(lg.ERROR, "listener.Accept() - %s", err)
logf(lg.FATAL, "listener.Accept() - %s", err)
fatalErr = err
}
break
}
go handler.Handle(clientConn)
}

logf(lg.INFO, "TCP: closing %s", listener.Addr())
return fatalErr
}
13 changes: 12 additions & 1 deletion nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type NSQD struct {
exitChan chan int
waitGroup util.WaitGroupWrapper

shuttingDown bool

ci *clusterinfo.ClusterInfo
}

Expand Down Expand Up @@ -263,7 +265,9 @@ func (n *NSQD) Main() {

tcpServer := &tcpServer{ctx: ctx}
n.waitGroup.Wrap(func() {
protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
if err := protocol.TCPServer(n.tcpListener, tcpServer, n.logf); err != nil {
go n.Exit()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest checking the exiting flag here, and if it is not already set, then go Exit().

The flag variable could be an atomic int, to avoid the need to take a lock.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(this suggestion would require Exit() to set the flag as the first thing it does)

Copy link
Member Author

@andyxning andyxning Feb 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we keep the check logic be within the Exit function. With this we should not take care about the internal exit status flag check before Exit is called in other places. And we should keep Exit function to be called only once. Without lock protection, even with atomic values, the set and check logic exists race condition.

With setting and checking separated with aotmic values, race condition may happens. For example, a normal exit and accept error exit happens simultaneously, before the normal exit call set the Exit flag, accept error exit checks it and then call Exit again. Although nsqd will exit correctly but Exit is called more than once.

With lock like this. Exit function logic except closing listeners will only be called once.

Copy link
Member Author

@andyxning andyxning Feb 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, lock consumption should be ok and will not invoke any new performance problems, because all this happens under exit phase.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I suggested the exiting flag early on is because deciding to call Exit here could be as simple as:

--- a/nsqd/nsqd.go
+++ b/nsqd/nsqd.go
@@ -71,6 +71,7 @@ type NSQD struct {
 
        notifyChan           chan interface{}
        optsNotificationChan chan struct{}
+       exitFlag             int32
        exitChan             chan int
        waitGroup            util.WaitGroupWrapper
 
@@ -264,6 +265,10 @@ func (n *NSQD) Main() {
        tcpServer := &tcpServer{ctx: ctx}
        n.waitGroup.Wrap(func() {
                protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
+               if atomic.LoadInt32(&t.exitFlag) == 0 {
+                       // abnormal listen loop exit
+                       go n.Exit()
+               }
        })
        httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
        n.waitGroup.Wrap(func() {
@@ -423,6 +428,10 @@ func (n *NSQD) PersistMetadata() error {
 }
 
 func (n *NSQD) Exit() {
+       if !atomic.CompareAndSwapInt32(&t.exitFlag, 0, 1) {
+               return
+       }
+
        if n.tcpListener != nil {
                n.tcpListener.Close()
        }

(but there still is the issue of exiting with an error code)

Copy link
Member

@mreiferson mreiferson Feb 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that the atomic would be slightly cleaner, but I prefer directing flow through the existing machinery that's setup to manage the lifecycle of the service to ensure that, whatever svc.Run is doing, it gets done (rather than introducing a new exceptional path).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah ... calling NSQD.Exit() cleans up but does not actually exit ... go-svc needs to be involved anyway

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more cheesy idea to avoid needing to deal with go-svc:

--- a/nsqd/nsqd.go
+++ b/nsqd/nsqd.go
@@ -264,6 +264,12 @@ func (n *NSQD) Main() {
        tcpServer := &tcpServer{ctx: ctx}
        n.waitGroup.Wrap(func() {
                protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
+               if atomic.LoadInt32(&t.exitFlag) == 0 {
+                       go func() {
+                               n.Exit()
+                               os.Exit(1)
+                       }()
+               }
        })

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meh, let's just do it the "right" way.

}
})
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
Expand Down Expand Up @@ -436,6 +440,13 @@ func (n *NSQD) Exit() {
}

n.Lock()

if n.shuttingDown {
n.Unlock()
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should unlock if returning here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

}
n.shuttingDown = true

err := n.PersistMetadata()
if err != nil {
n.logf(LOG_ERROR, "failed to persist metadata - %s", err)
Expand Down
17 changes: 16 additions & 1 deletion nsqlookupd/nsqlookupd.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type NSQLookupd struct {
httpListener net.Listener
waitGroup util.WaitGroupWrapper
DB *RegistrationDB

shuttingDown bool
}

func New(opts *Options) *NSQLookupd {
Expand Down Expand Up @@ -62,7 +64,9 @@ func (l *NSQLookupd) Main() error {

tcpServer := &tcpServer{ctx: ctx}
l.waitGroup.Wrap(func() {
protocol.TCPServer(tcpListener, tcpServer, l.logf)
if err := protocol.TCPServer(tcpListener, tcpServer, l.logf); err != nil {
go l.Exit()
}
})
httpServer := newHTTPServer(ctx)
l.waitGroup.Wrap(func() {
Expand All @@ -88,5 +92,16 @@ func (l *NSQLookupd) Exit() {
if l.httpListener != nil {
l.httpListener.Close()
}

l.Lock()

if l.shuttingDown {
l.Unlock()
return
}
l.shuttingDown = true

l.Unlock()

l.waitGroup.Wait()
}