diff --git a/server/const.go b/server/const.go index fddd48f4..caa4921e 100644 --- a/server/const.go +++ b/server/const.go @@ -49,7 +49,7 @@ const ( DEFAULT_ROUTE_CONNECT = 1 * time.Second // Route dial timeout - DEFAULT_ROUTE_DIAL = 2 * time.Second + DEFAULT_ROUTE_DIAL = 1 * time.Second // Default size of proto to print on parse errors PROTO_SNIPPET_SIZE = 32 diff --git a/server/route.go b/server/route.go index 1efe673b..cc6bb0e8 100644 --- a/server/route.go +++ b/server/route.go @@ -76,6 +76,8 @@ func (s *Server) createRoute(conn net.Conn, rUrl *url.URL) *client { Debug("Route connection created", clientConnStr(c.nc), c.cid) + c.mu.Lock() + // Queue Connect proto if we solicited the connection. if didSolicit { r.url = rUrl @@ -91,6 +93,7 @@ func (s *Server) createRoute(conn net.Conn, rUrl *url.URL) *client { ttl := secondsToDuration(s.opts.ClusterAuthTimeout) c.setAuthTimer(ttl) } + c.mu.Unlock() // Register with the server. s.mu.Lock() @@ -260,19 +263,21 @@ func (s *Server) StartRouting() { s.solicitRoutes() } -// FIXME(dlc): Need to shutdown when exiting func (s *Server) connectToRoute(rUrl *url.URL) { for s.isRunning() { Debugf("Trying to connect to route on %s", rUrl.Host) conn, err := net.DialTimeout("tcp", rUrl.Host, DEFAULT_ROUTE_DIAL) if err != nil { Debugf("Error trying to connect to route: %v", err) - // FIXME(dlc): wait on kick out - time.Sleep(DEFAULT_ROUTE_CONNECT) - continue + select { + case <-s.rcQuit: + return + case <-time.After(DEFAULT_ROUTE_CONNECT): + continue + } } - // We have a connection here. Go ahead and create it and - // exit this func. + // We have a route connection here. + // Go ahead and create it and exit this func. s.createRoute(conn, rUrl) return } diff --git a/server/server.go b/server/server.go index bfb73787..be5ccfd9 100644 --- a/server/server.go +++ b/server/server.go @@ -46,6 +46,7 @@ type Server struct { grid uint64 routeInfo Info routeInfoJson []byte + rcQuit chan bool } type stats struct { @@ -92,6 +93,10 @@ func New(opts *Options) *Server { // For tracking routes s.routes = make(map[uint64]*client) + // Used to kick out all of the route + // connect Go routines. + s.rcQuit = make(chan bool) + // Generate the info json b, err := json.Marshal(s.info) if err != nil { @@ -160,12 +165,16 @@ func (s *Server) Shutdown() { s.listener = nil } + // Kick route AcceptLoop() if s.routeListener != nil { doneExpected++ s.routeListener.Close() s.routeListener = nil } + // Release the solicited routes connect go routines. + close(s.rcQuit) + s.mu.Unlock() // Close client connections diff --git a/test/routes_test.go b/test/routes_test.go index bc3b2f6b..6d271053 100644 --- a/test/routes_test.go +++ b/test/routes_test.go @@ -5,6 +5,7 @@ package test import ( "encoding/json" "fmt" + "io/ioutil" "runtime" "strings" "testing" @@ -18,9 +19,9 @@ func runRouteServer(t *testing.T) (*server.Server, *server.Options) { // Override for running in Go routine. opts.NoSigs = true - opts.Debug = true - opts.Trace = true - //opts.NoLog = true + opts.Debug = true + opts.Trace = true + opts.NoLog = true if err != nil { t.Fatalf("Error parsing config file: %v\n", err) @@ -44,6 +45,7 @@ func TestRouteGoServerShutdown(t *testing.T) { time.Sleep(10 * time.Millisecond) delta := (runtime.NumGoroutine() - base) if delta > 1 { + panic("foo") t.Fatalf("%d Go routines still exist post Shutdown()", delta) } } @@ -210,7 +212,7 @@ func TestRouteOnlySendOnce(t *testing.T) { client := createClientConn(t, opts.Host, opts.Port) defer client.Close() - clientSend, _ := setupConn(t, client) + clientSend, clientExpect := setupConn(t, client) route := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) expectAuthRequired(t, route) @@ -220,9 +222,13 @@ func TestRouteOnlySendOnce(t *testing.T) { // Express multiple interest on this route for foo. routeSend("SUB foo RSID:2:1\r\n") routeSend("SUB foo RSID:2:2\r\n") + routeSend("PING\r\n") + routeExpect(pongRe) // Send PUB via client connection clientSend("PUB foo 2\r\nok\r\n") + clientSend("PING\r\n") + clientExpect(pongRe) matches := expectMsgs(1) checkMsg(t, matches[0], "foo", "RSID:2:1", "", "2", "ok") @@ -339,12 +345,13 @@ func TestMultipleRoutesSameId(t *testing.T) { route1 := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) expectAuthRequired(t, route1) - route1Send, route1Expect := setupRouteEx(t, route1, opts, "ROUTE:2222") - route1ExpectMsgs := expectMsgsCommand(t, route1Expect) + route1Send, _ := setupRouteEx(t, route1, opts, "ROUTE:2222") +// route1ExpectMsgs := expectMsgsCommand(t, route1Expect) route2 := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) expectAuthRequired(t, route2) route2Send, _ := setupRouteEx(t, route2, opts, "ROUTE:2222") +// route2ExpectMsgs := expectMsgsCommand(t, route1Expect) // Send SUB via route connections sub := "SUB foo RSID:2:22\r\n" @@ -360,17 +367,33 @@ func TestMultipleRoutesSameId(t *testing.T) { // Setup a client client := createClientConn(t, opts.Host, opts.Port) - clientSend, _ := setupConn(t, client) + clientSend, clientExpect := setupConn(t, client) defer client.Close() // Send PUB via client connection clientSend("PUB foo 2\r\nok\r\n") + clientSend("PING\r\n") + clientExpect(pongRe) - // We should only receive on the first route. - route1ExpectMsgs(1) + // We should only receive on one route, not both. + // Check both manually. + route1.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) + buf, _ := ioutil.ReadAll(route1) + route1.SetReadDeadline(time.Time{}) + if len(buf) <= 0 { + route2.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) + buf, _ = ioutil.ReadAll(route2) + route2.SetReadDeadline(time.Time{}) + if len(buf) <= 0 { + t.Fatal("Expected to get one message on a route, received none.") + } + } - // Nothing on the second. - expectNothing(t, route2) + matches := msgRe.FindAllSubmatch(buf, -1) + if len(matches) != 1 { + t.Fatalf("Expected 1 msg, got %d\n", len(matches)) + } + checkMsg(t, matches[0], "foo", "", "", "2", "ok") } func TestRouteResendsLocalSubsOnReconnect(t *testing.T) { diff --git a/test/test.go b/test/test.go index e21dc46b..d5f81866 100644 --- a/test/test.go +++ b/test/test.go @@ -286,7 +286,7 @@ var expBuf = make([]byte, 32768) // Test result from server against regexp func expectResult(t tLogger, c net.Conn, re *regexp.Regexp) []byte { // Wait for commands to be processed and results queued for read - c.SetReadDeadline(time.Now().Add(2 * time.Second)) + c.SetReadDeadline(time.Now().Add(1 * time.Second)) defer c.SetReadDeadline(time.Time{}) n, err := c.Read(expBuf)