From 3330820502ec18d9b60d8ba8bcbbfe38eabfdcca Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 14 Nov 2019 13:14:56 -0800 Subject: [PATCH] Fixed a bug where we leaked service imports. Also prior this would have leaked subscriptions as well. Signed-off-by: Derek Collison --- server/accounts.go | 7 +++++ server/events.go | 2 +- server/reload.go | 3 ++ server/server.go | 13 +++++---- test/leafnode_test.go | 8 ++++++ test/norace_test.go | 8 +++++- test/service_latency_test.go | 55 ++++++++++++++++++++++++++++++++++++ 7 files changed, 89 insertions(+), 7 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 0aa55283..c60a4b22 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -805,6 +805,13 @@ func (a *Account) AddServiceImport(destination *Account, from, to string) error return a.AddServiceImportWithClaim(destination, from, to, nil) } +// NumServiceImports return number of service imports we have. +func (a *Account) NumServiceImports() int { + a.mu.RLock() + defer a.mu.RUnlock() + return len(a.imports.services) +} + // removeServiceImport will remove the route by subject. func (a *Account) removeServiceImport(subject string) { a.mu.Lock() diff --git a/server/events.go b/server/events.go index cb90c90f..8306fe4f 100644 --- a/server/events.go +++ b/server/events.go @@ -1083,7 +1083,7 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, subject, _ st m1.merge(&m2) // Make sure we remove the entry here. - si.acc.removeServiceImport(si.from) + acc.removeServiceImport(si.from) // Send the metrics s.sendInternalAccountMsg(acc, lsub, &m1) } diff --git a/server/reload.go b/server/reload.go index cde8767d..4ae0d547 100644 --- a/server/reload.go +++ b/server/reload.go @@ -785,6 +785,9 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { // Ignore NoLog and NoSigs options since they are not parsed and only used in // testing. continue + case "disableshortfirstping": + newOpts.DisableShortFirstPing = oldValue.(bool) + continue case "maxtracedmsglen": diffOpts = append(diffOpts, &maxTracedMsgLenOption{newValue: newValue.(int)}) case "port": diff --git a/server/server.go b/server/server.go index 71fcaaa7..1184428f 100644 --- a/server/server.go +++ b/server/server.go @@ -2549,12 +2549,15 @@ func (s *Server) shouldReportConnectErr(firstConnect bool, attempts int) bool { func (s *Server) setFirstPingTimer(c *client) { opts := s.getOpts() d := opts.PingInterval - if c.kind != CLIENT { - if d > firstPingInterval { - d = firstPingInterval + + if !opts.DisableShortFirstPing { + if c.kind != CLIENT { + if d > firstPingInterval { + d = firstPingInterval + } + } else if d > firstClientPingInterval { + d = firstClientPingInterval } - } else if d > firstClientPingInterval && !opts.DisableShortFirstPing { - d = firstClientPingInterval } // We randomize the first one by an offset up to 20%, e.g. 2m ~= max 24s. addDelay := rand.Int63n(int64(d / 5)) diff --git a/test/leafnode_test.go b/test/leafnode_test.go index 24d2af75..356061f3 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -547,6 +547,14 @@ func shutdownCluster(c *cluster) { } } +func (c *cluster) totalSubs() int { + totalSubs := 0 + for _, s := range c.servers { + totalSubs += int(s.NumSubscriptions()) + } + return totalSubs +} + // Wait for the expected number of outbound gateways, or fails. func waitForOutboundGateways(t *testing.T, s *server.Server, expected int, timeout time.Duration) { t.Helper() diff --git a/test/norace_test.go b/test/norace_test.go index 93457699..fa2ce069 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -47,10 +47,16 @@ func TestNoRaceRouteSendSubs(t *testing.T) { cfa := createConfFile(t, []byte(fmt.Sprintf(template, ""))) defer os.Remove(cfa) srvA, optsA := RunServerWithConfig(cfa) + srvA.Shutdown() + optsA.DisableShortFirstPing = true + srvA = RunServer(optsA) defer srvA.Shutdown() cfb := createConfFile(t, []byte(fmt.Sprintf(template, ""))) srvB, optsB := RunServerWithConfig(cfb) + srvB.Shutdown() + optsB.DisableShortFirstPing = true + srvB = RunServer(optsB) defer srvB.Shutdown() clientA := createClientConn(t, optsA.Host, optsA.Port) @@ -178,12 +184,12 @@ func TestNoRaceRouteSendSubs(t *testing.T) { replyMsg := fmt.Sprintf("PUB ping.replies %d\r\n%s\r\n", len(payload), payload) for _, s := range senders { go func(s *sender, count int) { + defer wg.Done() for i := 0; i < count; i++ { s.sf(replyMsg) } s.sf("PING\r\n") s.ef(pongRe) - wg.Done() }(s, totalReplies/len(senders)) } diff --git a/test/service_latency_test.go b/test/service_latency_test.go index a23939e1..abda2d0f 100644 --- a/test/service_latency_test.go +++ b/test/service_latency_test.go @@ -104,6 +104,14 @@ func (sc *supercluster) removeLatencyTracking(t *testing.T) { } } +func (sc *supercluster) totalSubs() int { + totalSubs := 0 + for _, c := range sc.clusters { + totalSubs += c.totalSubs() + } + return totalSubs +} + func clientConnectWithName(t *testing.T, opts *server.Options, user, appname string) *nats.Conn { t.Helper() url := fmt.Sprintf("nats://%s:pass@%s:%d", user, opts.Host, opts.Port) @@ -400,6 +408,53 @@ func TestServiceLatencySampling(t *testing.T) { } } +func TestServiceLatencyNoSubsLeak(t *testing.T) { + sc := createSuperCluster(t, 3, 3) + defer sc.shutdown() + + // Now add in new service export to FOO and have bar import that with tracking enabled. + sc.setupLatencyTracking(t, 100) + + nc := clientConnectWithName(t, sc.clusters[0].opts[1], "foo", "dlc22") + defer nc.Close() + + // The service listener. + nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) { + msg.Respond([]byte("22 msgs")) + }) + nc.Flush() + // Propagation of sub through super cluster. + time.Sleep(100 * time.Millisecond) + + startSubs := sc.totalSubs() + + fooAcc, _ := sc.clusters[1].servers[1].LookupAccount("FOO") + startNumSis := fooAcc.NumServiceImports() + + for i := 0; i < 100; i++ { + nc := clientConnect(t, sc.clusters[1].opts[1], "bar") + if _, err := nc.Request("ngs.usage", []byte("1h"), time.Second); err != nil { + t.Fatalf("Error on request: %v", err) + } + nc.Close() + } + + checkFor(t, time.Second, 50*time.Millisecond, func() error { + if numSubs := sc.totalSubs(); numSubs != startSubs { + return fmt.Errorf("Leaked %d subs", numSubs-startSubs) + } + return nil + }) + + // Now also check to make sure the service imports created for the request go away as well. + checkFor(t, time.Second, 50*time.Millisecond, func() error { + if numSis := fooAcc.NumServiceImports(); numSis != startNumSis { + return fmt.Errorf("Leaked %d service imports", numSis-startNumSis) + } + return nil + }) +} + func TestServiceLatencyWithName(t *testing.T) { sc := createSuperCluster(t, 1, 1) defer sc.shutdown()