From 4ea3f9c57ebfbfb089a251119df1e36f631995e6 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 8 Jul 2020 14:19:32 -0600 Subject: [PATCH] [FIXED] Handling or real duplicate subscription That is, if the server receives "SUB foo 1" more than once from the same client, we would register in the client map this subscription only once, and add to the account's sublist only once, however we would have updated shadow subscriptions and route/gateway maps for each SUB protocol, which would result in inability to send unsubscribe to routes when the client goes away or unsubscribes. Signed-off-by: Ivan Kozlovic --- server/client.go | 8 +++++- test/new_routes_test.go | 60 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/server/client.go b/server/client.go index 5a115f72..2af3928f 100644 --- a/server/client.go +++ b/server/client.go @@ -2165,7 +2165,8 @@ func (c *client) processSub(argo []byte, noForward bool) (*subscription, error) var err error // Subscribe here. - if c.subs[sid] == nil { + es := c.subs[sid] + if es == nil { c.subs[sid] = sub if acc != nil && acc.sl != nil { err = acc.sl.Insert(sub) @@ -2186,6 +2187,11 @@ func (c *client) processSub(argo []byte, noForward bool) (*subscription, error) c.sendOK() } + // If it was already registered, return it. + if es != nil { + return es, nil + } + // No account just return. if acc == nil { return sub, nil diff --git a/test/new_routes_test.go b/test/new_routes_test.go index fbd1002b..8356eff5 100644 --- a/test/new_routes_test.go +++ b/test/new_routes_test.go @@ -1118,6 +1118,11 @@ func addServiceExport(subject string, authorized []*server.Account, targets ...* var isPublic = []*server.Account(nil) func TestNewRouteStreamImport(t *testing.T) { + testNewRouteStreamImport(t, false) +} + +func testNewRouteStreamImport(t *testing.T, duplicateSub bool) { + t.Helper() srvA, srvB, optsA, optsB := runServers(t) defer srvA.Shutdown() defer srvB.Shutdown() @@ -1143,7 +1148,11 @@ func TestNewRouteStreamImport(t *testing.T) { defer clientB.Close() sendB, expectB := setupConnWithAccount(t, clientB, "$bar") - sendB("SUB foo 1\r\nPING\r\n") + sendB("SUB foo 1\r\n") + if duplicateSub { + sendB("SUB foo 1\r\n") + } + sendB("PING\r\n") expectB(pongRe) // The subscription on "foo" for account $bar will also become @@ -1833,3 +1842,52 @@ func TestNewRouteLeafNodeOriginSupport(t *testing.T) { t.Fatalf("Should not have received the message on bar") } } + +// Check that real duplicate subscription (that is, sent by client with same sid) +// are ignored and do not register multiple shadow subscriptions. +func TestNewRouteDuplicateSubscription(t *testing.T) { + // This is same test than TestNewRouteStreamImport but calling "SUB foo 1" twice. + testNewRouteStreamImport(t, true) + + opts := LoadConfig("./configs/new_cluster.conf") + opts.DisableShortFirstPing = true + s := RunServer(opts) + defer s.Shutdown() + + rc := createRouteConn(t, opts.Cluster.Host, opts.Cluster.Port) + defer rc.Close() + + routeID := "RTEST_DUPLICATE:22" + routeSend, routeExpect := setupRouteEx(t, rc, opts, routeID) + info := checkInfoMsg(t, rc) + info.ID = routeID + b, err := json.Marshal(info) + if err != nil { + t.Fatalf("Could not marshal test route info: %v", err) + } + routeSend(fmt.Sprintf("INFO %s\r\n", b)) + routeSend("PING\r\n") + routeExpect(pongRe) + + c := createClientConn(t, opts.Host, opts.Port) + defer c.Close() + send, expect := setupConn(t, c) + + // Create a real duplicate subscriptions (same sid) + send("SUB foo 1\r\nSUB foo 1\r\nPING\r\n") + expect(pongRe) + + // Route should receive single RS+ + routeExpect(rsubRe) + + // Unsubscribe. + send("UNSUB 1\r\nPING\r\n") + expect(pongRe) + + // Route should receive RS-. + // With defect, only 1 subscription would be found during the unsubscribe, + // however route map would have been updated twice when processing the + // duplicate SUB, which means that the RS- would not be received because + // the count would still be 1. + routeExpect(runsubRe) +}