Don't send route unsub with max

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2018-06-01 15:40:39 -06:00
parent 049db6e854
commit 26dafe464b
4 changed files with 27 additions and 74 deletions

View File

@@ -148,7 +148,7 @@ func (c *client) reRouteQMsg(r *SublistResult, msgh, msg, group []byte) {
var qsubs []*subscription
for _, qs := range r.qsubs {
if len(qs) != 0 && bytes.Compare(group, qs[0].queue) == 0 {
if len(qs) != 0 && bytes.Equal(group, qs[0].queue) {
qsubs = qs
break
}
@@ -182,7 +182,7 @@ func (c *client) reRouteQMsg(r *SublistResult, msgh, msg, group []byte) {
rsub = sub
continue
}
mh := c.msgHeader(msgh[:len(msgh)], sub)
mh := c.msgHeader(msgh[:], sub)
if c.deliverMsg(sub, mh, msg) {
c.Debugf("Redelivery succeeded for message on group '%q'", group)
return
@@ -191,7 +191,7 @@ func (c *client) reRouteQMsg(r *SublistResult, msgh, msg, group []byte) {
// If we are here we failed to find a local, see if we snapshotted a
// remote sub, and if so deliver to that.
if rsub != nil {
mh := c.msgHeader(msgh[:len(msgh)], rsub)
mh := c.msgHeader(msgh[:], rsub)
if c.deliverMsg(rsub, mh, msg) {
c.Debugf("Re-routing message on group '%q' to remote server", group)
return
@@ -639,7 +639,7 @@ const (
const (
subProto = "SUB %s %s %s" + _CRLF_
unsubProto = "UNSUB %s%s" + _CRLF_
unsubProto = "UNSUB %s" + _CRLF_
)
// FIXME(dlc) - Make these reserved and reject if they come in as a sid
@@ -820,15 +820,15 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) {
if s.numRoutes() == 0 {
return
}
rsid := routeSid(sub)
maxStr := _EMPTY_
sub.client.mu.Lock()
// Set max if we have it set and have not tripped auto-unsubscribe
if sub.max > 0 && sub.nm < sub.max {
maxStr = fmt.Sprintf(" %d", sub.max)
}
// Max has no meaning on the other side of a route, so do not send.
hasMax := sub.max > 0 && sub.nm < sub.max
sub.client.mu.Unlock()
proto := fmt.Sprintf(unsubProto, rsid, maxStr)
if hasMax {
return
}
rsid := routeSid(sub)
proto := fmt.Sprintf(unsubProto, rsid)
s.broadcastInterestToRoutes(proto)
}

View File

@@ -950,12 +950,12 @@ func TestRoutedQueueAutoUnsubscribe(t *testing.T) {
if nbar == expected && nbaz == expected {
time.Sleep(500 * time.Millisecond)
// Now check all mappings are gone.
srvA.mu.Lock()
srvA.rqsMu.RLock()
nrqsa := len(srvA.rqsubs)
srvA.mu.Unlock()
srvB.mu.Lock()
srvA.rqsMu.RUnlock()
srvB.rqsMu.RLock()
nrqsb := len(srvB.rqsubs)
srvB.mu.Unlock()
srvB.rqsMu.RUnlock()
if nrqsa != 0 || nrqsb != 0 {
t.Fatalf("Expected rqs mappings to have cleared, but got A:%d, B:%d\n",
nrqsa, nrqsb)

View File

@@ -544,55 +544,10 @@ func TestRouteResendsLocalSubsOnReconnect(t *testing.T) {
routeExpect(subRe)
}
func TestAutoUnsubPropagation(t *testing.T) {
s, opts := runRouteServer(t)
defer s.Shutdown()
type ignoreLogger struct{}
client := createClientConn(t, opts.Host, opts.Port)
defer client.Close()
clientSend, clientExpect := setupConn(t, client)
route := createRouteConn(t, opts.Cluster.Host, opts.Cluster.Port)
defer route.Close()
expectAuthRequired(t, route)
routeSend, routeExpect := setupRouteEx(t, route, opts, "ROUTER:xyz")
routeSend("INFO {\"server_id\":\"ROUTER:xyz\"}\r\n")
// Setup a local subscription
clientSend("SUB foo 2\r\n")
clientSend("PING\r\n")
clientExpect(pongRe)
routeExpect(subRe)
clientSend("UNSUB 2 1\r\n")
clientSend("PING\r\n")
clientExpect(pongRe)
routeExpect(unsubmaxRe)
clientSend("PUB foo 2\r\nok\r\n")
clientExpect(msgRe)
clientSend("PING\r\n")
clientExpect(pongRe)
clientSend("UNSUB 2\r\n")
clientSend("PING\r\n")
clientExpect(pongRe)
routeExpect(unsubnomaxRe)
}
type ignoreLogger struct {
}
func (l *ignoreLogger) Fatalf(f string, args ...interface{}) {
}
func (l *ignoreLogger) Errorf(f string, args ...interface{}) {
}
func (l *ignoreLogger) Fatalf(f string, args ...interface{}) {}
func (l *ignoreLogger) Errorf(f string, args ...interface{}) {}
func TestRouteConnectOnShutdownRace(t *testing.T) {
s, opts := runRouteServer(t)

View File

@@ -244,17 +244,15 @@ func sendProto(t tLogger, c net.Conn, op string) {
}
var (
infoRe = regexp.MustCompile(`INFO\s+([^\r\n]+)\r\n`)
pingRe = regexp.MustCompile(`PING\r\n`)
pongRe = regexp.MustCompile(`PONG\r\n`)
msgRe = regexp.MustCompile(`(?:(?:MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\s*\r\n([^\\r\\n]*?)\r\n)+?)`)
okRe = regexp.MustCompile(`\A\+OK\r\n`)
errRe = regexp.MustCompile(`\A\-ERR\s+([^\r\n]+)\r\n`)
subRe = regexp.MustCompile(`SUB\s+([^\s]+)((\s+)([^\s]+))?\s+([^\s]+)\r\n`)
unsubRe = regexp.MustCompile(`UNSUB\s+([^\s]+)(\s+(\d+))?\r\n`)
unsubmaxRe = regexp.MustCompile(`UNSUB\s+([^\s]+)(\s+(\d+))\r\n`)
unsubnomaxRe = regexp.MustCompile(`UNSUB\s+([^\s]+)\r\n`)
connectRe = regexp.MustCompile(`CONNECT\s+([^\r\n]+)\r\n`)
infoRe = regexp.MustCompile(`INFO\s+([^\r\n]+)\r\n`)
pingRe = regexp.MustCompile(`PING\r\n`)
pongRe = regexp.MustCompile(`PONG\r\n`)
msgRe = regexp.MustCompile(`(?:(?:MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\s*\r\n([^\\r\\n]*?)\r\n)+?)`)
okRe = regexp.MustCompile(`\A\+OK\r\n`)
errRe = regexp.MustCompile(`\A\-ERR\s+([^\r\n]+)\r\n`)
subRe = regexp.MustCompile(`SUB\s+([^\s]+)((\s+)([^\s]+))?\s+([^\s]+)\r\n`)
unsubRe = regexp.MustCompile(`UNSUB\s+([^\s]+)(\s+(\d+))?\r\n`)
connectRe = regexp.MustCompile(`CONNECT\s+([^\r\n]+)\r\n`)
)
const (