diff --git a/server/route.go b/server/route.go index 8e5784c0..1efe673b 100644 --- a/server/route.go +++ b/server/route.go @@ -52,6 +52,20 @@ func (c *client) sendConnect() { c.bw.Flush() } +func (s *Server) sendLocalSubsToRoute(route *client) { + for _, client := range s.clients { + for _, s := range client.subs.All() { + if sub, ok := s.(*subscription); ok { + rsid := routeSid(sub) + proto := fmt.Sprintf(subProto, sub.subject, sub.queue, rsid) + route.bw.WriteString(proto) + } + } + } + route.bw.Flush() + Debug("Route sent local subscriptions", clientConnStr(route.nc), route.cid) +} + func (s *Server) createRoute(conn net.Conn, rUrl *url.URL) *client { didSolicit := rUrl != nil r := &route{didSolicit: didSolicit} @@ -65,13 +79,14 @@ func (s *Server) createRoute(conn net.Conn, rUrl *url.URL) *client { // Queue Connect proto if we solicited the connection. if didSolicit { r.url = rUrl + Debug("Route connect msg sent", clientConnStr(c.nc), c.cid) c.sendConnect() } // Send our info to the other side. s.sendInfo(c) - // Check for Auth + // Check for Auth required state for incoming connections. if s.routeInfo.AuthRequired && !didSolicit { ttl := secondsToDuration(s.opts.ClusterAuthTimeout) c.setAuthTimer(ttl) @@ -82,6 +97,9 @@ func (s *Server) createRoute(conn net.Conn, rUrl *url.URL) *client { s.routes[c.cid] = c s.mu.Unlock() + // Send our local subscriptions to this route. + s.sendLocalSubsToRoute(c) + return c } @@ -100,8 +118,9 @@ const ( // from a client connection. const ( - RSID = "RSID" - QRSID = "QRSID" + RSID = "RSID" + QRSID = "QRSID" + RSID_CID_INDEX = 1 RSID_SID_INDEX = 2 EXPECTED_MATCHES = 3 diff --git a/test/routes_test.go b/test/routes_test.go index 8737f4e5..bc3b2f6b 100644 --- a/test/routes_test.go +++ b/test/routes_test.go @@ -18,9 +18,9 @@ func runRouteServer(t *testing.T) (*server.Server, *server.Options) { // Override for running in Go routine. opts.NoSigs = true - //opts.Debug = true - //opts.Trace = true - opts.NoLog = true + opts.Debug = true + opts.Trace = true + //opts.NoLog = true if err != nil { t.Fatalf("Error parsing config file: %v\n", err) @@ -124,8 +124,8 @@ func TestSendRouteSolicit(t *testing.T) { buf := expectResult(t, conn, connectRe) // Check INFO follows. Could be inline, with first result, if not - // check followon buffer. - if !inlineInfoRe.Match(buf) { + // check follow-on buffer. + if !infoRe.Match(buf) { expectResult(t, conn, infoRe) } } @@ -146,10 +146,7 @@ func TestRouteForwardsMsgFromClients(t *testing.T) { expectMsgs := expectMsgsCommand(t, routeExpect) // Eat the CONNECT and INFO protos - buf := routeExpect(connectRe) - if !inlineInfoRe.Match(buf) { - routeExpect(infoRe) - } + routeExpect(infoRe) // Send SUB via route connection routeSend("SUB foo RSID:2:22\r\n") @@ -374,4 +371,32 @@ func TestMultipleRoutesSameId(t *testing.T) { // Nothing on the second. expectNothing(t, route2) -} \ No newline at end of file +} + +func TestRouteResendsLocalSubsOnReconnect(t *testing.T) { + s, opts := runRouteServer(t) + defer s.Shutdown() + + client := createClientConn(t, opts.Host, opts.Port) + clientSend, clientExpect := setupConn(t, client) + + // Setup a local subscription + clientSend("SUB foo 1\r\n") + clientSend("PING\r\n") + clientExpect(pongRe) + + route := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) + _, routeExpect := setupRouteEx(t, route, opts, "ROUTE:4222") + + // Expect to see the local sub echoed through. + routeExpect(subRe) + + // Close and re-open + route.Close() + + route = createRouteConn(t, opts.ClusterHost, opts.ClusterPort) + _, routeExpect = setupRouteEx(t, route, opts, "ROUTE:4222") + + // Expect to see the local sub echoed through. + routeExpect(subRe) +} diff --git a/test/test.go b/test/test.go index 8e6a6f12..e21dc46b 100644 --- a/test/test.go +++ b/test/test.go @@ -259,16 +259,15 @@ func sendProto(t tLogger, c net.Conn, op string) { } var ( - infoRe = regexp.MustCompile(`\AINFO\s+([^\r\n]+)\r\n`) - pingRe = regexp.MustCompile(`\APING\r\n`) - pongRe = regexp.MustCompile(`\APONG\r\n`) + infoRe = regexp.MustCompile(`INFO\s+([^\r\n]+)\r\n`) + pingRe = regexp.MustCompile(`PING\r\n`) + pongRe = regexp.MustCompile(`PONG\r\n`) msgRe = regexp.MustCompile(`(?:(?:MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\s*\r\n([^\\r\\n]*?)\r\n)+?)`) okRe = regexp.MustCompile(`\A\+OK\r\n`) errRe = regexp.MustCompile(`\A\-ERR\s+([^\r\n]+)\r\n`) - subRe = regexp.MustCompile(`\ASUB\s+([^\s]+)((\s+)([^\s]+))?\s+([^\s]+)\r\n`) - unsubRe = regexp.MustCompile(`\AUNSUB\s+([^\s]+)(\s+(\d+))?\r\n`) - connectRe = regexp.MustCompile(`\ACONNECT\s+([^\r\n]+)\r\n`) - inlineInfoRe = regexp.MustCompile(`\r\nINFO\s+([^\r\n]+)\r\n`) + subRe = regexp.MustCompile(`SUB\s+([^\s]+)((\s+)([^\s]+))?\s+([^\s]+)\r\n`) + unsubRe = regexp.MustCompile(`UNSUB\s+([^\s]+)(\s+(\d+))?\r\n`) + connectRe = regexp.MustCompile(`CONNECT\s+([^\r\n]+)\r\n`) ) const ( @@ -280,6 +279,8 @@ const ( ) // Reuse expect buffer +// TODO(dlc) - This may be too simplistic in the long run, may need +// to consider holding onto data from previous reads matched by conn. var expBuf = make([]byte, 32768) // Test result from server against regexp