diff --git a/server/client.go b/server/client.go index 5a115f72..2af3928f 100644 --- a/server/client.go +++ b/server/client.go @@ -2165,7 +2165,8 @@ func (c *client) processSub(argo []byte, noForward bool) (*subscription, error) var err error // Subscribe here. - if c.subs[sid] == nil { + es := c.subs[sid] + if es == nil { c.subs[sid] = sub if acc != nil && acc.sl != nil { err = acc.sl.Insert(sub) @@ -2186,6 +2187,11 @@ func (c *client) processSub(argo []byte, noForward bool) (*subscription, error) c.sendOK() } + // If it was already registered, return it. + if es != nil { + return es, nil + } + // No account just return. if acc == nil { return sub, nil diff --git a/test/new_routes_test.go b/test/new_routes_test.go index fbd1002b..8356eff5 100644 --- a/test/new_routes_test.go +++ b/test/new_routes_test.go @@ -1118,6 +1118,11 @@ func addServiceExport(subject string, authorized []*server.Account, targets ...* var isPublic = []*server.Account(nil) func TestNewRouteStreamImport(t *testing.T) { + testNewRouteStreamImport(t, false) +} + +func testNewRouteStreamImport(t *testing.T, duplicateSub bool) { + t.Helper() srvA, srvB, optsA, optsB := runServers(t) defer srvA.Shutdown() defer srvB.Shutdown() @@ -1143,7 +1148,11 @@ func TestNewRouteStreamImport(t *testing.T) { defer clientB.Close() sendB, expectB := setupConnWithAccount(t, clientB, "$bar") - sendB("SUB foo 1\r\nPING\r\n") + sendB("SUB foo 1\r\n") + if duplicateSub { + sendB("SUB foo 1\r\n") + } + sendB("PING\r\n") expectB(pongRe) // The subscription on "foo" for account $bar will also become @@ -1833,3 +1842,52 @@ func TestNewRouteLeafNodeOriginSupport(t *testing.T) { t.Fatalf("Should not have received the message on bar") } } + +// Check that real duplicate subscription (that is, sent by client with same sid) +// are ignored and do not register multiple shadow subscriptions. +func TestNewRouteDuplicateSubscription(t *testing.T) { + // This is same test than TestNewRouteStreamImport but calling "SUB foo 1" twice. + testNewRouteStreamImport(t, true) + + opts := LoadConfig("./configs/new_cluster.conf") + opts.DisableShortFirstPing = true + s := RunServer(opts) + defer s.Shutdown() + + rc := createRouteConn(t, opts.Cluster.Host, opts.Cluster.Port) + defer rc.Close() + + routeID := "RTEST_DUPLICATE:22" + routeSend, routeExpect := setupRouteEx(t, rc, opts, routeID) + info := checkInfoMsg(t, rc) + info.ID = routeID + b, err := json.Marshal(info) + if err != nil { + t.Fatalf("Could not marshal test route info: %v", err) + } + routeSend(fmt.Sprintf("INFO %s\r\n", b)) + routeSend("PING\r\n") + routeExpect(pongRe) + + c := createClientConn(t, opts.Host, opts.Port) + defer c.Close() + send, expect := setupConn(t, c) + + // Create a real duplicate subscriptions (same sid) + send("SUB foo 1\r\nSUB foo 1\r\nPING\r\n") + expect(pongRe) + + // Route should receive single RS+ + routeExpect(rsubRe) + + // Unsubscribe. + send("UNSUB 1\r\nPING\r\n") + expect(pongRe) + + // Route should receive RS-. + // With defect, only 1 subscription would be found during the unsubscribe, + // however route map would have been updated twice when processing the + // duplicate SUB, which means that the RS- would not be received because + // the count would still be 1. + routeExpect(runsubRe) +}