[FIXED] Handling or real duplicate subscription

That is, if the server receives "SUB foo 1" more than once from
the same client, we would register in the client map this subscription
only once, and add to the account's sublist only once, however we
would have updated shadow subscriptions and route/gateway maps for
each SUB protocol, which would result in inability to send unsubscribe
to routes when the client goes away or unsubscribes.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2020-07-08 14:19:32 -06:00
parent 735e06b9b9
commit 4ea3f9c57e
2 changed files with 66 additions and 2 deletions

View File

@@ -2165,7 +2165,8 @@ func (c *client) processSub(argo []byte, noForward bool) (*subscription, error)
var err error
// Subscribe here.
if c.subs[sid] == nil {
es := c.subs[sid]
if es == nil {
c.subs[sid] = sub
if acc != nil && acc.sl != nil {
err = acc.sl.Insert(sub)
@@ -2186,6 +2187,11 @@ func (c *client) processSub(argo []byte, noForward bool) (*subscription, error)
c.sendOK()
}
// If it was already registered, return it.
if es != nil {
return es, nil
}
// No account just return.
if acc == nil {
return sub, nil

View File

@@ -1118,6 +1118,11 @@ func addServiceExport(subject string, authorized []*server.Account, targets ...*
var isPublic = []*server.Account(nil)
func TestNewRouteStreamImport(t *testing.T) {
testNewRouteStreamImport(t, false)
}
func testNewRouteStreamImport(t *testing.T, duplicateSub bool) {
t.Helper()
srvA, srvB, optsA, optsB := runServers(t)
defer srvA.Shutdown()
defer srvB.Shutdown()
@@ -1143,7 +1148,11 @@ func TestNewRouteStreamImport(t *testing.T) {
defer clientB.Close()
sendB, expectB := setupConnWithAccount(t, clientB, "$bar")
sendB("SUB foo 1\r\nPING\r\n")
sendB("SUB foo 1\r\n")
if duplicateSub {
sendB("SUB foo 1\r\n")
}
sendB("PING\r\n")
expectB(pongRe)
// The subscription on "foo" for account $bar will also become
@@ -1833,3 +1842,52 @@ func TestNewRouteLeafNodeOriginSupport(t *testing.T) {
t.Fatalf("Should not have received the message on bar")
}
}
// Check that real duplicate subscription (that is, sent by client with same sid)
// are ignored and do not register multiple shadow subscriptions.
func TestNewRouteDuplicateSubscription(t *testing.T) {
// This is same test than TestNewRouteStreamImport but calling "SUB foo 1" twice.
testNewRouteStreamImport(t, true)
opts := LoadConfig("./configs/new_cluster.conf")
opts.DisableShortFirstPing = true
s := RunServer(opts)
defer s.Shutdown()
rc := createRouteConn(t, opts.Cluster.Host, opts.Cluster.Port)
defer rc.Close()
routeID := "RTEST_DUPLICATE:22"
routeSend, routeExpect := setupRouteEx(t, rc, opts, routeID)
info := checkInfoMsg(t, rc)
info.ID = routeID
b, err := json.Marshal(info)
if err != nil {
t.Fatalf("Could not marshal test route info: %v", err)
}
routeSend(fmt.Sprintf("INFO %s\r\n", b))
routeSend("PING\r\n")
routeExpect(pongRe)
c := createClientConn(t, opts.Host, opts.Port)
defer c.Close()
send, expect := setupConn(t, c)
// Create a real duplicate subscriptions (same sid)
send("SUB foo 1\r\nSUB foo 1\r\nPING\r\n")
expect(pongRe)
// Route should receive single RS+
routeExpect(rsubRe)
// Unsubscribe.
send("UNSUB 1\r\nPING\r\n")
expect(pongRe)
// Route should receive RS-.
// With defect, only 1 subscription would be found during the unsubscribe,
// however route map would have been updated twice when processing the
// duplicate SUB, which means that the RS- would not be received because
// the count would still be 1.
routeExpect(runsubRe)
}