Reconnect logic

This commit is contained in:
Derek Collison
2013-07-29 20:02:47 -07:00
parent 0adb1f443b
commit de469cada6
4 changed files with 62 additions and 10 deletions

View File

@@ -752,6 +752,7 @@ func (c *client) closeConnection() {
c.clearPingTimer()
c.clearConnection()
// Snapshot for use.
subs := c.subs.All()
srv := c.srv
@@ -761,11 +762,16 @@ func (c *client) closeConnection() {
// Unregister
srv.removeClient(c)
// Remove subscriptions.
// Remove clients subscriptions.
for _, s := range subs {
if sub, ok := s.(*subscription); ok {
srv.sl.Remove(sub.subject, sub)
}
}
}
// Check for a solicited route. If it was, start up a reconnect.
if c.isSolicitedRoute() {
go srv.connectToRoute(c.route.url)
}
}

View File

@@ -13,7 +13,6 @@ import (
type route struct {
remoteId string
didSolicit bool
sid uint64
url *url.URL
}
@@ -222,6 +221,12 @@ func (s *Server) connectToRoute(rUrl *url.URL) {
}
}
func (c *client) isSolicitedRoute() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.typ == ROUTER && c.route != nil && c.route.didSolicit
}
func (s *Server) solicitRoutes() {
for _, r := range s.opts.Routes {
go s.connectToRoute(r)

View File

@@ -18,8 +18,8 @@ func runRouteServer(t *testing.T) (*server.Server, *server.Options) {
// Override for running in Go routine.
opts.NoSigs = true
// opts.Debug = true
// opts.Trace = true
//opts.Debug = true
//opts.Trace = true
opts.NoLog = true
if err != nil {
@@ -148,7 +148,6 @@ func TestRouteForwardsMsgFromClients(t *testing.T) {
// Eat the CONNECT and INFO protos
buf := routeExpect(connectRe)
if !inlineInfoRe.Match(buf) {
fmt.Printf("Looking for separate INFO\n")
routeExpect(infoRe)
}
@@ -239,7 +238,7 @@ func TestRouteQueueSemantics(t *testing.T) {
client := createClientConn(t, opts.Host, opts.Port)
defer client.Close()
clientSend, _ := setupConn(t, client)
clientSend, clientExpect := setupConn(t, client)
route := createRouteConn(t, opts.ClusterHost, opts.ClusterPort)
expectAuthRequired(t, route)
@@ -250,12 +249,52 @@ func TestRouteQueueSemantics(t *testing.T) {
routeSend("SUB foo bar RSID:2:1\r\n")
routeSend("SUB foo bar RSID:2:2\r\n")
// Send PUB via client connection
clientSend("PUB foo 2\r\nok\r\n")
// Only 1
matches := expectMsgs(1)
checkMsg(t, matches[0], "foo", "RSID:2:1", "", "2", "ok")
// Normal Interest as well.
routeSend("SUB foo RSID:2:1\r\n")
// Send PUB via client connection
clientSend("PUB foo 2\r\nok\r\n")
matches := expectMsgs(1)
checkMsg(t, matches[0], "foo", "RSID:2:1", "", "2", "ok")
}
// Still only 1
expectMsgs(1)
// Subscribe to foo on client
clientSend("SUB foo bar 1\r\n")
// Use ping roundtrip to make sure its processed.
clientSend("PING\r\n")
clientExpect(pongRe)
// Receive notification on route
routeExpect(subRe)
// Send PUB via client connection
clientSend("PUB foo 2\r\nok\r\n")
// Still only 1 for route
expectMsgs(1)
// We could get one on client
}
func TestSolicitRouteReconnect(t *testing.T) {
s, opts := runRouteServer(t)
defer s.Shutdown()
rUrl := opts.Routes[0]
route := acceptRouteConn(t, rUrl.Host, server.DEFAULT_ROUTE_CONNECT)
// Go ahead and close the Route.
route.Close()
// We expect to get called back..
route = acceptRouteConn(t, rUrl.Host, 2*server.DEFAULT_ROUTE_CONNECT)
}

View File

@@ -139,8 +139,10 @@ func stackFatalf(t tLogger, f string, args ...interface{}) {
func acceptRouteConn(t tLogger, host string, timeout time.Duration) net.Conn {
l, e := net.Listen("tcp", host)
if e != nil {
stackFatalf(t, "Error listening for route connection on %v", host)
stackFatalf(t, "Error listening for route connection on %v: %v", host, e)
}
defer l.Close()
tl := l.(*net.TCPListener)
tl.SetDeadline(time.Now().Add(timeout))