Send local subs, tests cleanups

This commit is contained in:
Derek Collison
2013-07-30 09:52:46 -07:00
parent b136f41fdb
commit 4e2ee9425d
3 changed files with 65 additions and 20 deletions

View File

@@ -52,6 +52,20 @@ func (c *client) sendConnect() {
c.bw.Flush()
}
func (s *Server) sendLocalSubsToRoute(route *client) {
for _, client := range s.clients {
for _, s := range client.subs.All() {
if sub, ok := s.(*subscription); ok {
rsid := routeSid(sub)
proto := fmt.Sprintf(subProto, sub.subject, sub.queue, rsid)
route.bw.WriteString(proto)
}
}
}
route.bw.Flush()
Debug("Route sent local subscriptions", clientConnStr(route.nc), route.cid)
}
func (s *Server) createRoute(conn net.Conn, rUrl *url.URL) *client {
didSolicit := rUrl != nil
r := &route{didSolicit: didSolicit}
@@ -65,13 +79,14 @@ func (s *Server) createRoute(conn net.Conn, rUrl *url.URL) *client {
// Queue Connect proto if we solicited the connection.
if didSolicit {
r.url = rUrl
Debug("Route connect msg sent", clientConnStr(c.nc), c.cid)
c.sendConnect()
}
// Send our info to the other side.
s.sendInfo(c)
// Check for Auth
// Check for Auth required state for incoming connections.
if s.routeInfo.AuthRequired && !didSolicit {
ttl := secondsToDuration(s.opts.ClusterAuthTimeout)
c.setAuthTimer(ttl)
@@ -82,6 +97,9 @@ func (s *Server) createRoute(conn net.Conn, rUrl *url.URL) *client {
s.routes[c.cid] = c
s.mu.Unlock()
// Send our local subscriptions to this route.
s.sendLocalSubsToRoute(c)
return c
}
@@ -100,8 +118,9 @@ const (
// from a client connection.
const (
RSID = "RSID"
QRSID = "QRSID"
RSID = "RSID"
QRSID = "QRSID"
RSID_CID_INDEX = 1
RSID_SID_INDEX = 2
EXPECTED_MATCHES = 3

View File

@@ -18,9 +18,9 @@ func runRouteServer(t *testing.T) (*server.Server, *server.Options) {
// Override for running in Go routine.
opts.NoSigs = true
//opts.Debug = true
//opts.Trace = true
opts.NoLog = true
opts.Debug = true
opts.Trace = true
//opts.NoLog = true
if err != nil {
t.Fatalf("Error parsing config file: %v\n", err)
@@ -124,8 +124,8 @@ func TestSendRouteSolicit(t *testing.T) {
buf := expectResult(t, conn, connectRe)
// Check INFO follows. Could be inline, with first result, if not
// check followon buffer.
if !inlineInfoRe.Match(buf) {
// check follow-on buffer.
if !infoRe.Match(buf) {
expectResult(t, conn, infoRe)
}
}
@@ -146,10 +146,7 @@ func TestRouteForwardsMsgFromClients(t *testing.T) {
expectMsgs := expectMsgsCommand(t, routeExpect)
// Eat the CONNECT and INFO protos
buf := routeExpect(connectRe)
if !inlineInfoRe.Match(buf) {
routeExpect(infoRe)
}
routeExpect(infoRe)
// Send SUB via route connection
routeSend("SUB foo RSID:2:22\r\n")
@@ -374,4 +371,32 @@ func TestMultipleRoutesSameId(t *testing.T) {
// Nothing on the second.
expectNothing(t, route2)
}
}
func TestRouteResendsLocalSubsOnReconnect(t *testing.T) {
s, opts := runRouteServer(t)
defer s.Shutdown()
client := createClientConn(t, opts.Host, opts.Port)
clientSend, clientExpect := setupConn(t, client)
// Setup a local subscription
clientSend("SUB foo 1\r\n")
clientSend("PING\r\n")
clientExpect(pongRe)
route := createRouteConn(t, opts.ClusterHost, opts.ClusterPort)
_, routeExpect := setupRouteEx(t, route, opts, "ROUTE:4222")
// Expect to see the local sub echoed through.
routeExpect(subRe)
// Close and re-open
route.Close()
route = createRouteConn(t, opts.ClusterHost, opts.ClusterPort)
_, routeExpect = setupRouteEx(t, route, opts, "ROUTE:4222")
// Expect to see the local sub echoed through.
routeExpect(subRe)
}

View File

@@ -259,16 +259,15 @@ func sendProto(t tLogger, c net.Conn, op string) {
}
var (
infoRe = regexp.MustCompile(`\AINFO\s+([^\r\n]+)\r\n`)
pingRe = regexp.MustCompile(`\APING\r\n`)
pongRe = regexp.MustCompile(`\APONG\r\n`)
infoRe = regexp.MustCompile(`INFO\s+([^\r\n]+)\r\n`)
pingRe = regexp.MustCompile(`PING\r\n`)
pongRe = regexp.MustCompile(`PONG\r\n`)
msgRe = regexp.MustCompile(`(?:(?:MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\s*\r\n([^\\r\\n]*?)\r\n)+?)`)
okRe = regexp.MustCompile(`\A\+OK\r\n`)
errRe = regexp.MustCompile(`\A\-ERR\s+([^\r\n]+)\r\n`)
subRe = regexp.MustCompile(`\ASUB\s+([^\s]+)((\s+)([^\s]+))?\s+([^\s]+)\r\n`)
unsubRe = regexp.MustCompile(`\AUNSUB\s+([^\s]+)(\s+(\d+))?\r\n`)
connectRe = regexp.MustCompile(`\ACONNECT\s+([^\r\n]+)\r\n`)
inlineInfoRe = regexp.MustCompile(`\r\nINFO\s+([^\r\n]+)\r\n`)
subRe = regexp.MustCompile(`SUB\s+([^\s]+)((\s+)([^\s]+))?\s+([^\s]+)\r\n`)
unsubRe = regexp.MustCompile(`UNSUB\s+([^\s]+)(\s+(\d+))?\r\n`)
connectRe = regexp.MustCompile(`CONNECT\s+([^\r\n]+)\r\n`)
)
const (
@@ -280,6 +279,8 @@ const (
)
// Reuse expect buffer
// TODO(dlc) - This may be too simplistic in the long run, may need
// to consider holding onto data from previous reads matched by conn.
var expBuf = make([]byte, 32768)
// Test result from server against regexp