Merge pull request #1185 from nats-io/leaks

Fixed a bug where we leaked service imports.
This commit is contained in:
Derek Collison
2019-11-14 13:31:54 -08:00
committed by GitHub
7 changed files with 89 additions and 7 deletions

View File

@@ -805,6 +805,13 @@ func (a *Account) AddServiceImport(destination *Account, from, to string) error
return a.AddServiceImportWithClaim(destination, from, to, nil)
}
// NumServiceImports return number of service imports we have.
func (a *Account) NumServiceImports() int {
a.mu.RLock()
defer a.mu.RUnlock()
return len(a.imports.services)
}
// removeServiceImport will remove the route by subject.
func (a *Account) removeServiceImport(subject string) {
a.mu.Lock()

View File

@@ -1083,7 +1083,7 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, subject, _ st
m1.merge(&m2)
// Make sure we remove the entry here.
si.acc.removeServiceImport(si.from)
acc.removeServiceImport(si.from)
// Send the metrics
s.sendInternalAccountMsg(acc, lsub, &m1)
}

View File

@@ -785,6 +785,9 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
// Ignore NoLog and NoSigs options since they are not parsed and only used in
// testing.
continue
case "disableshortfirstping":
newOpts.DisableShortFirstPing = oldValue.(bool)
continue
case "maxtracedmsglen":
diffOpts = append(diffOpts, &maxTracedMsgLenOption{newValue: newValue.(int)})
case "port":

View File

@@ -2549,12 +2549,15 @@ func (s *Server) shouldReportConnectErr(firstConnect bool, attempts int) bool {
func (s *Server) setFirstPingTimer(c *client) {
opts := s.getOpts()
d := opts.PingInterval
if c.kind != CLIENT {
if d > firstPingInterval {
d = firstPingInterval
if !opts.DisableShortFirstPing {
if c.kind != CLIENT {
if d > firstPingInterval {
d = firstPingInterval
}
} else if d > firstClientPingInterval {
d = firstClientPingInterval
}
} else if d > firstClientPingInterval && !opts.DisableShortFirstPing {
d = firstClientPingInterval
}
// We randomize the first one by an offset up to 20%, e.g. 2m ~= max 24s.
addDelay := rand.Int63n(int64(d / 5))

View File

@@ -547,6 +547,14 @@ func shutdownCluster(c *cluster) {
}
}
func (c *cluster) totalSubs() int {
totalSubs := 0
for _, s := range c.servers {
totalSubs += int(s.NumSubscriptions())
}
return totalSubs
}
// Wait for the expected number of outbound gateways, or fails.
func waitForOutboundGateways(t *testing.T, s *server.Server, expected int, timeout time.Duration) {
t.Helper()

View File

@@ -47,10 +47,16 @@ func TestNoRaceRouteSendSubs(t *testing.T) {
cfa := createConfFile(t, []byte(fmt.Sprintf(template, "")))
defer os.Remove(cfa)
srvA, optsA := RunServerWithConfig(cfa)
srvA.Shutdown()
optsA.DisableShortFirstPing = true
srvA = RunServer(optsA)
defer srvA.Shutdown()
cfb := createConfFile(t, []byte(fmt.Sprintf(template, "")))
srvB, optsB := RunServerWithConfig(cfb)
srvB.Shutdown()
optsB.DisableShortFirstPing = true
srvB = RunServer(optsB)
defer srvB.Shutdown()
clientA := createClientConn(t, optsA.Host, optsA.Port)
@@ -178,12 +184,12 @@ func TestNoRaceRouteSendSubs(t *testing.T) {
replyMsg := fmt.Sprintf("PUB ping.replies %d\r\n%s\r\n", len(payload), payload)
for _, s := range senders {
go func(s *sender, count int) {
defer wg.Done()
for i := 0; i < count; i++ {
s.sf(replyMsg)
}
s.sf("PING\r\n")
s.ef(pongRe)
wg.Done()
}(s, totalReplies/len(senders))
}

View File

@@ -104,6 +104,14 @@ func (sc *supercluster) removeLatencyTracking(t *testing.T) {
}
}
func (sc *supercluster) totalSubs() int {
totalSubs := 0
for _, c := range sc.clusters {
totalSubs += c.totalSubs()
}
return totalSubs
}
func clientConnectWithName(t *testing.T, opts *server.Options, user, appname string) *nats.Conn {
t.Helper()
url := fmt.Sprintf("nats://%s:pass@%s:%d", user, opts.Host, opts.Port)
@@ -400,6 +408,53 @@ func TestServiceLatencySampling(t *testing.T) {
}
}
func TestServiceLatencyNoSubsLeak(t *testing.T) {
sc := createSuperCluster(t, 3, 3)
defer sc.shutdown()
// Now add in new service export to FOO and have bar import that with tracking enabled.
sc.setupLatencyTracking(t, 100)
nc := clientConnectWithName(t, sc.clusters[0].opts[1], "foo", "dlc22")
defer nc.Close()
// The service listener.
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
msg.Respond([]byte("22 msgs"))
})
nc.Flush()
// Propagation of sub through super cluster.
time.Sleep(100 * time.Millisecond)
startSubs := sc.totalSubs()
fooAcc, _ := sc.clusters[1].servers[1].LookupAccount("FOO")
startNumSis := fooAcc.NumServiceImports()
for i := 0; i < 100; i++ {
nc := clientConnect(t, sc.clusters[1].opts[1], "bar")
if _, err := nc.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
t.Fatalf("Error on request: %v", err)
}
nc.Close()
}
checkFor(t, time.Second, 50*time.Millisecond, func() error {
if numSubs := sc.totalSubs(); numSubs != startSubs {
return fmt.Errorf("Leaked %d subs", numSubs-startSubs)
}
return nil
})
// Now also check to make sure the service imports created for the request go away as well.
checkFor(t, time.Second, 50*time.Millisecond, func() error {
if numSis := fooAcc.NumServiceImports(); numSis != startNumSis {
return fmt.Errorf("Leaked %d service imports", numSis-startNumSis)
}
return nil
})
}
func TestServiceLatencyWithName(t *testing.T) {
sc := createSuperCluster(t, 1, 1)
defer sc.shutdown()