diff --git a/server/accounts_test.go b/server/accounts_test.go index ce42571e..a0fd945d 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -1071,7 +1071,7 @@ func TestServiceExportWithWildcards(t *testing.T) { t.Fatalf("Error registering client with 'bar' account: %v", err) } - // Now setup the resonder under cfoo + // Now setup the responder under cfoo cfoo.parse([]byte("SUB ngs.update.* 1\r\n")) // Now send the request. Remember we expect the request on our local ngs.update. @@ -1406,6 +1406,156 @@ func TestAccountRequestReplyTrackLatency(t *testing.T) { } } +func genAsyncFlushParser(c *client) (func(string), chan bool) { + pab := make(chan []byte, 16) + pas := func(cs string) { pab <- []byte(cs) } + quit := make(chan bool) + go func() { + for { + select { + case cs := <-pab: + c.parseAndFlush(cs) + case <-quit: + return + } + } + }() + return pas, quit +} + +// This will test for leaks in the remote latency tracking via client.rrTracking +func TestAccountTrackLatencyRemoteLeaks(t *testing.T) { + optsA, _ := ProcessConfigFile("./configs/seed.conf") + optsA.NoSigs, optsA.NoLog = true, true + srvA := RunServer(optsA) + defer srvA.Shutdown() + optsB := nextServerOpts(optsA) + optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, optsA.Cluster.Port)) + srvB := RunServer(optsB) + defer srvB.Shutdown() + + checkClusterFormed(t, srvA, srvB) + srvs := []*Server{srvA, srvB} + + // Now add in the accounts and setup tracking. + for _, s := range srvs { + s.SetSystemAccount(globalAccountName) + fooAcc, _ := s.RegisterAccount("$foo") + fooAcc.AddServiceExport("track.service", nil) + fooAcc.TrackServiceExport("track.service", "results") + barAcc, _ := s.RegisterAccount("$bar") + if err := barAcc.AddServiceImport(fooAcc, "req", "track.service"); err != nil { + t.Fatalf("Failed to import: %v", err) + } + } + + // Test with a responder on second server, srvB. but they will not respond. + cfoo, crFoo, _ := newClientForServer(srvB) + defer cfoo.nc.Close() + fooAcc, _ := srvB.LookupAccount("$foo") + if err := cfoo.registerWithAccount(fooAcc); err != nil { + t.Fatalf("Error registering client with 'foo' account: %v", err) + } + + // Set new limits + fooAcc.SetAutoExpireTTL(time.Millisecond) + fooAcc.SetMaxAutoExpireResponseMaps(5) + + // Now setup the resonder under cfoo and the listener for the results + time.Sleep(50 * time.Millisecond) + baseSubs := int(srvA.NumSubscriptions()) + cfoo.parse([]byte("SUB track.service 1\r\n")) + // Wait for it to propagate. + checkExpectedSubs(t, baseSubs+1, srvA) + + cbar, _, _ := newClientForServer(srvA) + defer cbar.nc.Close() + barAcc, _ := srvA.LookupAccount("$bar") + if err := cbar.registerWithAccount(barAcc); err != nil { + t.Fatalf("Error registering client with 'bar' account: %v", err) + } + + parseAsync, quit := genAsyncFlushParser(cbar) + defer func() { quit <- true }() + + readFooMsg := func() ([]byte, string) { + t.Helper() + l, err := crFoo.ReadString('\n') + if err != nil { + t.Fatalf("Error reading from client 'bar': %v", err) + } + mraw := msgPat.FindAllStringSubmatch(l, -1) + if len(mraw) == 0 { + t.Fatalf("No message received") + } + msg := mraw[0] + msgSize, _ := strconv.Atoi(msg[LEN_INDEX]) + return grabPayload(crFoo, msgSize), msg[REPLY_INDEX] + } + + // Send 2 requests + parseAsync("SUB resp 11\r\nPUB req resp 4\r\nhelp\r\nPUB req resp 4\r\nhelp\r\n") + + readFooMsg() + readFooMsg() + + var rc *client + // Pull out first client + srvB.mu.Lock() + for _, rc = range srvB.clients { + if rc != nil { + break + } + } + srvB.mu.Unlock() + + tracking := func() int { + rc.mu.Lock() + numTracking := len(rc.rrTracking) + rc.mu.Unlock() + return numTracking + } + + numTracking := tracking() + + if numTracking != 2 { + t.Fatalf("Expected to have 2 tracking replies, got %d", numTracking) + } + + // Make sure these remote tracking replies honor the current auto expire TTL. + time.Sleep(2 * time.Millisecond) + + rc.mu.Lock() + rc.pruneRemoteTracking() + numTracking = len(rc.rrTracking) + rc.mu.Unlock() + + if numTracking != 0 { + t.Fatalf("Expected to have no more tracking replies, got %d", numTracking) + } + + // Test that we trigger on max. + for i := 0; i < 4; i++ { + parseAsync("PUB req resp 4\r\nhelp\r\n") + readFooMsg() + } + + if numTracking = tracking(); numTracking != 4 { + t.Fatalf("Expected to have 4 tracking replies, got %d", numTracking) + } + + // Make sure they will be expired. + time.Sleep(2 * time.Millisecond) + + // Should trigger here + parseAsync("PUB req resp 4\r\nhelp\r\n") + readFooMsg() + + if numTracking = tracking(); numTracking != 1 { + t.Fatalf("Expected to have 1 tracking reply, got %d", numTracking) + } +} + func TestCrossAccountRequestReplyResponseMaps(t *testing.T) { s, fooAcc, barAcc := simpleAccountServer(t) defer s.Shutdown() diff --git a/server/client.go b/server/client.go index db5664ef..0681bb01 100644 --- a/server/client.go +++ b/server/client.go @@ -192,6 +192,7 @@ type client struct { rtt time.Duration rttStart time.Time rrTracking map[string]*remoteLatency + rrMax int route *route gw *gateway @@ -2227,7 +2228,7 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool { // FIXME(dlc) - We may need to optimize this. if client.acc.IsExportServiceTracking(string(c.pa.subject)) { // If we do not have a registered RTT queue that up now. - if client.rtt == 0 { + if client.rtt == 0 && c.flags.isSet(connectReceived) { client.sendPing() } // We will have tagged this with a suffix ('.T') if we are tracking. This is @@ -2284,6 +2285,7 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool { func (c *client) trackRemoteReply(reply string) { if c.rrTracking == nil { c.rrTracking = make(map[string]*remoteLatency) + c.rrMax = c.acc.MaxAutoExpireResponseMaps() } rl := remoteLatency{ Account: c.acc.Name, @@ -2291,6 +2293,9 @@ func (c *client) trackRemoteReply(reply string) { } rl.M2.RequestStart = time.Now() c.rrTracking[reply] = &rl + if len(c.rrTracking) >= c.rrMax { + c.pruneRemoteTracking() + } } // pruneReplyPerms will remove any stale or expired entries @@ -2340,6 +2345,20 @@ func (c *client) prunePubPermsCache() { } } +// pruneRemoteTracking will prune any remote tracking objects +// that are too old. These are orphaned when a service is not +// sending reponses etc. +// Lock should be held upon entry. +func (c *client) pruneRemoteTracking() { + ttl := c.acc.AutoExpireTTL() + now := time.Now() + for reply, rl := range c.rrTracking { + if now.Sub(rl.M2.RequestStart) > ttl { + delete(c.rrTracking, reply) + } + } +} + // pubAllowed checks on publish permissioning. // Lock should not be held. func (c *client) pubAllowed(subject string) bool {