Prune remote reply tracking

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-08-30 15:39:47 -07:00
parent bb11f7bd2d
commit 67470911fe
2 changed files with 171 additions and 2 deletions

View File

@@ -1071,7 +1071,7 @@ func TestServiceExportWithWildcards(t *testing.T) {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
// Now setup the resonder under cfoo
// Now setup the responder under cfoo
cfoo.parse([]byte("SUB ngs.update.* 1\r\n"))
// Now send the request. Remember we expect the request on our local ngs.update.
@@ -1406,6 +1406,156 @@ func TestAccountRequestReplyTrackLatency(t *testing.T) {
}
}
func genAsyncFlushParser(c *client) (func(string), chan bool) {
pab := make(chan []byte, 16)
pas := func(cs string) { pab <- []byte(cs) }
quit := make(chan bool)
go func() {
for {
select {
case cs := <-pab:
c.parseAndFlush(cs)
case <-quit:
return
}
}
}()
return pas, quit
}
// This will test for leaks in the remote latency tracking via client.rrTracking
func TestAccountTrackLatencyRemoteLeaks(t *testing.T) {
optsA, _ := ProcessConfigFile("./configs/seed.conf")
optsA.NoSigs, optsA.NoLog = true, true
srvA := RunServer(optsA)
defer srvA.Shutdown()
optsB := nextServerOpts(optsA)
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, optsA.Cluster.Port))
srvB := RunServer(optsB)
defer srvB.Shutdown()
checkClusterFormed(t, srvA, srvB)
srvs := []*Server{srvA, srvB}
// Now add in the accounts and setup tracking.
for _, s := range srvs {
s.SetSystemAccount(globalAccountName)
fooAcc, _ := s.RegisterAccount("$foo")
fooAcc.AddServiceExport("track.service", nil)
fooAcc.TrackServiceExport("track.service", "results")
barAcc, _ := s.RegisterAccount("$bar")
if err := barAcc.AddServiceImport(fooAcc, "req", "track.service"); err != nil {
t.Fatalf("Failed to import: %v", err)
}
}
// Test with a responder on second server, srvB. but they will not respond.
cfoo, crFoo, _ := newClientForServer(srvB)
defer cfoo.nc.Close()
fooAcc, _ := srvB.LookupAccount("$foo")
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
// Set new limits
fooAcc.SetAutoExpireTTL(time.Millisecond)
fooAcc.SetMaxAutoExpireResponseMaps(5)
// Now setup the resonder under cfoo and the listener for the results
time.Sleep(50 * time.Millisecond)
baseSubs := int(srvA.NumSubscriptions())
cfoo.parse([]byte("SUB track.service 1\r\n"))
// Wait for it to propagate.
checkExpectedSubs(t, baseSubs+1, srvA)
cbar, _, _ := newClientForServer(srvA)
defer cbar.nc.Close()
barAcc, _ := srvA.LookupAccount("$bar")
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
parseAsync, quit := genAsyncFlushParser(cbar)
defer func() { quit <- true }()
readFooMsg := func() ([]byte, string) {
t.Helper()
l, err := crFoo.ReadString('\n')
if err != nil {
t.Fatalf("Error reading from client 'bar': %v", err)
}
mraw := msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
msg := mraw[0]
msgSize, _ := strconv.Atoi(msg[LEN_INDEX])
return grabPayload(crFoo, msgSize), msg[REPLY_INDEX]
}
// Send 2 requests
parseAsync("SUB resp 11\r\nPUB req resp 4\r\nhelp\r\nPUB req resp 4\r\nhelp\r\n")
readFooMsg()
readFooMsg()
var rc *client
// Pull out first client
srvB.mu.Lock()
for _, rc = range srvB.clients {
if rc != nil {
break
}
}
srvB.mu.Unlock()
tracking := func() int {
rc.mu.Lock()
numTracking := len(rc.rrTracking)
rc.mu.Unlock()
return numTracking
}
numTracking := tracking()
if numTracking != 2 {
t.Fatalf("Expected to have 2 tracking replies, got %d", numTracking)
}
// Make sure these remote tracking replies honor the current auto expire TTL.
time.Sleep(2 * time.Millisecond)
rc.mu.Lock()
rc.pruneRemoteTracking()
numTracking = len(rc.rrTracking)
rc.mu.Unlock()
if numTracking != 0 {
t.Fatalf("Expected to have no more tracking replies, got %d", numTracking)
}
// Test that we trigger on max.
for i := 0; i < 4; i++ {
parseAsync("PUB req resp 4\r\nhelp\r\n")
readFooMsg()
}
if numTracking = tracking(); numTracking != 4 {
t.Fatalf("Expected to have 4 tracking replies, got %d", numTracking)
}
// Make sure they will be expired.
time.Sleep(2 * time.Millisecond)
// Should trigger here
parseAsync("PUB req resp 4\r\nhelp\r\n")
readFooMsg()
if numTracking = tracking(); numTracking != 1 {
t.Fatalf("Expected to have 1 tracking reply, got %d", numTracking)
}
}
func TestCrossAccountRequestReplyResponseMaps(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()

View File

@@ -192,6 +192,7 @@ type client struct {
rtt time.Duration
rttStart time.Time
rrTracking map[string]*remoteLatency
rrMax int
route *route
gw *gateway
@@ -2227,7 +2228,7 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
// FIXME(dlc) - We may need to optimize this.
if client.acc.IsExportServiceTracking(string(c.pa.subject)) {
// If we do not have a registered RTT queue that up now.
if client.rtt == 0 {
if client.rtt == 0 && c.flags.isSet(connectReceived) {
client.sendPing()
}
// We will have tagged this with a suffix ('.T') if we are tracking. This is
@@ -2284,6 +2285,7 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
func (c *client) trackRemoteReply(reply string) {
if c.rrTracking == nil {
c.rrTracking = make(map[string]*remoteLatency)
c.rrMax = c.acc.MaxAutoExpireResponseMaps()
}
rl := remoteLatency{
Account: c.acc.Name,
@@ -2291,6 +2293,9 @@ func (c *client) trackRemoteReply(reply string) {
}
rl.M2.RequestStart = time.Now()
c.rrTracking[reply] = &rl
if len(c.rrTracking) >= c.rrMax {
c.pruneRemoteTracking()
}
}
// pruneReplyPerms will remove any stale or expired entries
@@ -2340,6 +2345,20 @@ func (c *client) prunePubPermsCache() {
}
}
// pruneRemoteTracking will prune any remote tracking objects
// that are too old. These are orphaned when a service is not
// sending reponses etc.
// Lock should be held upon entry.
func (c *client) pruneRemoteTracking() {
ttl := c.acc.AutoExpireTTL()
now := time.Now()
for reply, rl := range c.rrTracking {
if now.Sub(rl.M2.RequestStart) > ttl {
delete(c.rrTracking, reply)
}
}
}
// pubAllowed checks on publish permissioning.
// Lock should not be held.
func (c *client) pubAllowed(subject string) bool {