diff --git a/.travis.yml b/.travis.yml index 2ca92a7d..e2d30702 100644 --- a/.travis.yml +++ b/.travis.yml @@ -23,9 +23,11 @@ before_script: - misspell -error -locale US $EXCLUDE_VENDOR - staticcheck $EXCLUDE_VENDOR script: +- set -e - go test -i $EXCLUDE_VENDOR -- go test -run=TestNoRace $EXCLUDE_VENDOR +- go test -run=TestNoRace --failfast -p=1 $EXCLUDE_VENDOR - if [[ "$TRAVIS_GO_VERSION" =~ 1.12 ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race -p=1 --failfast $EXCLUDE_VENDOR; fi +- set +e deploy: provider: script diff --git a/server/accounts_test.go b/server/accounts_test.go index e1cbeb25..7b5d24ae 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -1496,22 +1496,32 @@ func TestCrossAccountServiceResponseLeaks(t *testing.T) { // Now send some requests..We will not respond. var sb strings.Builder - for i := 0; i < 100; i++ { + for i := 0; i < 50; i++ { sb.WriteString(fmt.Sprintf("PUB foo REPLY.%d 4\r\nhelp\r\n", i)) } go cbar.parseAndFlush([]byte(sb.String())) // Make sure requests are processed. - _, err := crFoo.ReadString('\n') - if err != nil { + if _, err := crFoo.ReadString('\n'); err != nil { t.Fatalf("Error reading from client 'bar': %v", err) } // We should have leaked response maps. - if nr := fooAcc.numServiceRoutes(); nr != 100 { + if nr := fooAcc.numServiceRoutes(); nr != 50 { t.Fatalf("Expected response maps to be present, got %d", nr) } + sb.Reset() + for i := 50; i < 100; i++ { + sb.WriteString(fmt.Sprintf("PUB foo REPLY.%d 4\r\nhelp\r\n", i)) + } + go cbar.parseAndFlush([]byte(sb.String())) + + // Make sure requests are processed. + if _, err := crFoo.ReadString('\n'); err != nil { + t.Fatalf("Error reading from client 'bar': %v", err) + } + // They should be gone here eventually. checkFor(t, time.Second, 10*time.Millisecond, func() error { if nr := fooAcc.numServiceRoutes(); nr != 0 { diff --git a/server/client.go b/server/client.go index 830d6237..29e9156b 100644 --- a/server/client.go +++ b/server/client.go @@ -280,7 +280,6 @@ type readCache struct { // This is for routes and gateways to have their own L1 as well that is account aware. pacache map[string]*perAccountCache - losc int64 // last orphan subs check // This is for when we deliver messages across a route. We use this structure // to make sure to only send one message and properly scope to queues as needed. @@ -300,13 +299,13 @@ type readCache struct { const ( defaultMaxPerAccountCacheSize = 4096 defaultPrunePerAccountCacheSize = 256 - defaultOrphanSubsCheckInterval = int64(5 * 60) //5 min in number of seconds + defaultClosedSubsCheckInterval = 5 * time.Minute ) var ( maxPerAccountCacheSize = defaultMaxPerAccountCacheSize prunePerAccountCacheSize = defaultPrunePerAccountCacheSize - orphanSubsCheckInterval = defaultOrphanSubsCheckInterval + closedSubsCheckInterval = defaultClosedSubsCheckInterval ) // perAccountCache is for L1 semantics for inbound messages from a route or gateway to mimic the performance of clients. @@ -340,27 +339,28 @@ func (c *client) GetTLSConnectionState() *tls.ConnectionState { // FIXME(dlc) - This is getting bloated for normal subs, need // to optionally have an opts section for non-normal stuff. type subscription struct { - nm int64 // Will atomically be set to -1 on unsub or connection close client *client im *streamImport // This is for import stream support. shadow []*subscription // This is to track shadowed accounts. subject []byte queue []byte sid []byte + nm int64 max int64 qw int32 + closed int32 } // Indicate that this subscription is closed. // This is used in pruning of route and gateway cache items. func (s *subscription) close() { - atomic.StoreInt64(&s.nm, -1) + atomic.StoreInt32(&s.closed, 1) } // Return true if this subscription was unsubscribed // or its connection has been closed. func (s *subscription) isClosed() bool { - return atomic.LoadInt64(&s.nm) == -1 + return atomic.LoadInt32(&s.closed) == 1 } type clientOpts struct { @@ -757,7 +757,6 @@ func (c *client) readLoop() { nc := c.nc s := c.srv c.in.rsz = startBufSize - c.in.losc = time.Now().Unix() // Snapshot max control line since currently can not be changed on reload and we // were checking it on each call to parse. If this changes and we allow MaxControlLine // to be reloaded without restart, this code will need to change. @@ -768,6 +767,10 @@ func (c *client) readLoop() { } } defer s.grWG.Done() + // Check the per-account-cache for closed subscriptions + cpacc := c.kind == ROUTER || c.kind == GATEWAY + // Last per-account-cache check for closed subscriptions + lpacc := time.Now() c.mu.Unlock() if nc == nil { @@ -875,6 +878,11 @@ func (c *client) readLoop() { c.closeConnection(closedStateForErr(err)) return } + + if cpacc && start.Sub(lpacc) >= closedSubsCheckInterval { + c.pruneClosedSubFromPerAccountCache() + lpacc = time.Now() + } } } @@ -2107,6 +2115,14 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool { return false } + // This is set under the client lock using atomic because it can be + // checked with atomic without the client lock. Here, we don't need + // the atomic operation since we are under the lock. + if sub.closed == 1 { + client.mu.Unlock() + return false + } + srv := client.srv sub.nm++ @@ -3241,35 +3257,33 @@ func (c *client) Account() *Account { // prunePerAccountCache will prune off a random number of cache entries. func (c *client) prunePerAccountCache() { n := 0 - now := time.Now().Unix() - if now-c.in.losc >= orphanSubsCheckInterval { - for cacheKey, pac := range c.in.pacache { - for _, sub := range pac.results.psubs { + for cacheKey := range c.in.pacache { + delete(c.in.pacache, cacheKey) + if n++; n > prunePerAccountCacheSize { + break + } + } +} + +// pruneClosedSubFromPerAccountCache remove entries that contain subscriptions +// that have been closed. +func (c *client) pruneClosedSubFromPerAccountCache() { + for cacheKey, pac := range c.in.pacache { + for _, sub := range pac.results.psubs { + if sub.isClosed() { + goto REMOVE + } + } + for _, qsub := range pac.results.qsubs { + for _, sub := range qsub { if sub.isClosed() { goto REMOVE } } - for _, qsub := range pac.results.qsubs { - for _, sub := range qsub { - if sub.isClosed() { - goto REMOVE - } - } - } - continue - REMOVE: - delete(c.in.pacache, cacheKey) - n++ - } - c.in.losc = now - } - if n < prunePerAccountCacheSize { - for cacheKey := range c.in.pacache { - delete(c.in.pacache, cacheKey) - if n++; n > prunePerAccountCacheSize { - break - } } + continue + REMOVE: + delete(c.in.pacache, cacheKey) } } diff --git a/server/gateway_test.go b/server/gateway_test.go index 8c1dbcdc..55bfed4f 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -4229,7 +4229,21 @@ func TestGatewayServiceExportWithWildcards(t *testing.T) { setAccountUserPassInOptions(oa2, "$foo", "clientA", "password") setAccountUserPassInOptions(oa2, "$bar", "yyyyyyy", "password") oa2.gatewaysSolicitDelay = time.Nanosecond // 0 would be default, so nano to connect asap - sa2 := runGatewayServer(oa2) + var sa2 *Server + sb2ID := sb2.ID() + for i := 0; i < 10; i++ { + sa2 = runGatewayServer(oa2) + ogc := sa2.getOutboundGatewayConnection("B") + if ogc != nil { + ogc.mu.Lock() + ok := ogc.opts.Name == sb2ID + ogc.mu.Unlock() + if ok { + break + } + } + sa2.Shutdown() + } defer sa2.Shutdown() checkClusterFormed(t, sa1, sa2) diff --git a/server/norace_test.go b/server/norace_test.go index 460bbed1..e7baa909 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -612,12 +612,12 @@ func TestNoRaceRouteMemUsage(t *testing.T) { func TestNoRaceRouteCache(t *testing.T) { maxPerAccountCacheSize = 20 prunePerAccountCacheSize = 5 - orphanSubsCheckInterval = 1 + closedSubsCheckInterval = 250 * time.Millisecond defer func() { maxPerAccountCacheSize = defaultMaxPerAccountCacheSize prunePerAccountCacheSize = defaultPrunePerAccountCacheSize - orphanSubsCheckInterval = defaultOrphanSubsCheckInterval + closedSubsCheckInterval = defaultClosedSubsCheckInterval }() for _, test := range []struct { @@ -705,7 +705,7 @@ func TestNoRaceRouteCache(t *testing.T) { checkExpected(t, (maxPerAccountCacheSize+1)-(prunePerAccountCacheSize+1)) // Wait for more than the orphan check - time.Sleep(1500 * time.Millisecond) + time.Sleep(2 * closedSubsCheckInterval) // Add a new subs up to point where new prune would occur sendReqs(t, requestor, prunePerAccountCacheSize+1, false) @@ -721,7 +721,7 @@ func TestNoRaceRouteCache(t *testing.T) { checkExpected(t, maxPerAccountCacheSize-prunePerAccountCacheSize) // Wait for more than the orphan check - time.Sleep(1500 * time.Millisecond) + time.Sleep(2 * closedSubsCheckInterval) // Now create new connection and send prunePerAccountCacheSize+1 // and that should cause all subs from previous connection to be