diff --git a/server/client.go b/server/client.go index 541e8a88..abf26d97 100644 --- a/server/client.go +++ b/server/client.go @@ -752,6 +752,7 @@ func (c *client) closeConnection() { c.clearPingTimer() c.clearConnection() + // Snapshot for use. subs := c.subs.All() srv := c.srv @@ -761,11 +762,16 @@ func (c *client) closeConnection() { // Unregister srv.removeClient(c) - // Remove subscriptions. + // Remove clients subscriptions. for _, s := range subs { if sub, ok := s.(*subscription); ok { srv.sl.Remove(sub.subject, sub) } } } + + // Check for a solicited route. If it was, start up a reconnect. + if c.isSolicitedRoute() { + go srv.connectToRoute(c.route.url) + } } diff --git a/server/route.go b/server/route.go index 0a994926..74a808e7 100644 --- a/server/route.go +++ b/server/route.go @@ -13,7 +13,6 @@ import ( type route struct { remoteId string didSolicit bool - sid uint64 url *url.URL } @@ -222,6 +221,12 @@ func (s *Server) connectToRoute(rUrl *url.URL) { } } +func (c *client) isSolicitedRoute() bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.typ == ROUTER && c.route != nil && c.route.didSolicit +} + func (s *Server) solicitRoutes() { for _, r := range s.opts.Routes { go s.connectToRoute(r) diff --git a/test/routes_test.go b/test/routes_test.go index 2c7287b7..a92b8152 100644 --- a/test/routes_test.go +++ b/test/routes_test.go @@ -18,8 +18,8 @@ func runRouteServer(t *testing.T) (*server.Server, *server.Options) { // Override for running in Go routine. opts.NoSigs = true - // opts.Debug = true - // opts.Trace = true + //opts.Debug = true + //opts.Trace = true opts.NoLog = true if err != nil { @@ -148,7 +148,6 @@ func TestRouteForwardsMsgFromClients(t *testing.T) { // Eat the CONNECT and INFO protos buf := routeExpect(connectRe) if !inlineInfoRe.Match(buf) { - fmt.Printf("Looking for separate INFO\n") routeExpect(infoRe) } @@ -239,7 +238,7 @@ func TestRouteQueueSemantics(t *testing.T) { client := createClientConn(t, opts.Host, opts.Port) defer client.Close() - clientSend, _ := setupConn(t, client) + clientSend, clientExpect := setupConn(t, client) route := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) expectAuthRequired(t, route) @@ -250,12 +249,52 @@ func TestRouteQueueSemantics(t *testing.T) { routeSend("SUB foo bar RSID:2:1\r\n") routeSend("SUB foo bar RSID:2:2\r\n") + // Send PUB via client connection + clientSend("PUB foo 2\r\nok\r\n") + + // Only 1 + matches := expectMsgs(1) + checkMsg(t, matches[0], "foo", "RSID:2:1", "", "2", "ok") + // Normal Interest as well. routeSend("SUB foo RSID:2:1\r\n") // Send PUB via client connection clientSend("PUB foo 2\r\nok\r\n") - matches := expectMsgs(1) - checkMsg(t, matches[0], "foo", "RSID:2:1", "", "2", "ok") -} \ No newline at end of file + // Still only 1 + expectMsgs(1) + + // Subscribe to foo on client + clientSend("SUB foo bar 1\r\n") + // Use ping roundtrip to make sure its processed. + clientSend("PING\r\n") + clientExpect(pongRe) + + // Receive notification on route + routeExpect(subRe) + + // Send PUB via client connection + clientSend("PUB foo 2\r\nok\r\n") + + // Still only 1 for route + expectMsgs(1) + + // We could get one on client + +} + +func TestSolicitRouteReconnect(t *testing.T) { + s, opts := runRouteServer(t) + defer s.Shutdown() + + rUrl := opts.Routes[0] + + route := acceptRouteConn(t, rUrl.Host, server.DEFAULT_ROUTE_CONNECT) + + // Go ahead and close the Route. + route.Close() + + // We expect to get called back.. + route = acceptRouteConn(t, rUrl.Host, 2*server.DEFAULT_ROUTE_CONNECT) +} diff --git a/test/test.go b/test/test.go index 1bc6ca13..05ad575d 100644 --- a/test/test.go +++ b/test/test.go @@ -139,8 +139,10 @@ func stackFatalf(t tLogger, f string, args ...interface{}) { func acceptRouteConn(t tLogger, host string, timeout time.Duration) net.Conn { l, e := net.Listen("tcp", host) if e != nil { - stackFatalf(t, "Error listening for route connection on %v", host) + stackFatalf(t, "Error listening for route connection on %v: %v", host, e) } + defer l.Close() + tl := l.(*net.TCPListener) tl.SetDeadline(time.Now().Add(timeout))