Merge pull request #324 from nats-io/fix_route_ipv6

[FIXED] Cluster's listener with IPv6
This commit is contained in:
Derek Collison
2016-08-12 15:20:36 -07:00
committed by GitHub
4 changed files with 89 additions and 2 deletions

View File

@@ -584,11 +584,13 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) {
}
func (s *Server) routeAcceptLoop(ch chan struct{}) {
hp := fmt.Sprintf("%s:%d", s.opts.ClusterHost, s.opts.ClusterPort)
hp := net.JoinHostPort(s.opts.ClusterHost, strconv.Itoa(s.opts.ClusterPort))
Noticef("Listening for route connections on %s", hp)
l, e := net.Listen("tcp", hp)
if e != nil {
Fatalf("Error listening on router port: %d - %v", s.opts.Port, e)
// We need to close this channel to avoid a deadlock
close(ch)
Fatalf("Error listening on router port: %d - %v", s.opts.ClusterPort, e)
return
}

View File

@@ -4,12 +4,14 @@ package server
import (
"fmt"
"net"
"net/url"
"reflect"
"testing"
"time"
"github.com/nats-io/nats"
"strconv"
)
func TestRouteConfig(t *testing.T) {
@@ -406,3 +408,58 @@ func TestRouteTLSHandshakeError(t *testing.T) {
t.Fatal("Route should have failed")
}
}
func TestBlockedShutdownOnRouteAcceptLoopFailure(t *testing.T) {
opts := DefaultOptions
opts.ClusterHost = "x.x.x.x"
opts.ClusterPort = 7222
s := New(&opts)
go s.Start()
// Wait a second
time.Sleep(time.Second)
ch := make(chan bool)
go func() {
s.Shutdown()
ch <- true
}()
timeout := time.NewTimer(5 * time.Second)
select {
case <-ch:
return
case <-timeout.C:
t.Fatal("Shutdown did not complete")
}
}
func TestRouteUseIPv6(t *testing.T) {
opts := DefaultOptions
opts.ClusterHost = "::"
opts.ClusterPort = 6222
// I believe that there is no IPv6 support on Travis...
// Regardless, cannot have this test fail simply because IPv6 is disabled
// on the host.
hp := net.JoinHostPort(opts.ClusterHost, strconv.Itoa(opts.ClusterPort))
_, err := net.ResolveTCPAddr("tcp", hp)
if err != nil {
t.Skipf("Skipping this test since there is no IPv6 support on this host: %v", err)
}
s := RunServer(&opts)
defer s.Shutdown()
routeUp := false
timeout := time.Now().Add(5 * time.Second)
for time.Now().Before(timeout) && !routeUp {
if s.GetRouteListenEndpoint() == "" {
time.Sleep(time.Second)
continue
}
routeUp = true
}
if !routeUp {
t.Fatal("Server failed to start route accept loop")
}
}

View File

@@ -218,7 +218,11 @@ func (s *Server) Start() {
Noticef("Starting nats-server version %s", VERSION)
Debugf("Go build version %s", s.info.GoVersion)
// Avoid RACE between Start() and Shutdown()
s.mu.Lock()
s.running = true
s.mu.Unlock()
s.grMu.Lock()
s.grRunning = true
s.grMu.Unlock()
@@ -342,6 +346,14 @@ func (s *Server) Shutdown() {
// AcceptLoop is exported for easier testing.
func (s *Server) AcceptLoop(clr chan struct{}) {
// If we were to exit before the listener is setup properly,
// make sure we close the channel.
defer func() {
if clr != nil {
close(clr)
}
}()
hp := net.JoinHostPort(s.opts.Host, strconv.Itoa(s.opts.Port))
Noticef("Listening for client connections on %s", hp)
l, e := net.Listen("tcp", hp)
@@ -384,6 +396,7 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
// Let the caller know that we are ready
close(clr)
clr = nil
tmpDelay := ACCEPT_MIN_SLEEP

View File

@@ -201,3 +201,18 @@ func TestGetConnectURLs(t *testing.T) {
checkConnectURLsHasOnlyOne()
}
}
func TestNoDeadlockOnStartFailure(t *testing.T) {
opts := DefaultOptions
opts.Host = "x.x.x.x" // bad host
opts.Port = 4222
opts.ClusterHost = "localhost"
opts.ClusterPort = 6222
s := New(&opts)
// This should return since it should fail to start a listener
// on x.x.x.x:4222
s.Start()
// We should be able to shutdown
s.Shutdown()
}