mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fixed subscription close
I noticed that TestNoRaceRoutedQueueAutoUnsubscribe started to fail a lot on Travis. Running locally I could see a 45 to 50% failures. After investigation I realized that the issue was that we have wrongly re-used `subscription.nm` and set to -1 on unsubscribe however, I believe that it was possible that when subscription was closed, the server may have already picked that consumer for a delivery which then causes nm==-1 to be bumped to 0, which was wrong. Commenting out the subscription.close() that sets nm to -1, I could not get the test to fail on macOS but would still get 7% failure on Linux VM. Adding the check to see if sub is closed in deliverMsg() completely erase the failures, even on Linux VM. We could still use `nm` set to -1 but check on deliverMsg(), the same way I use the closed int32 now. Fixed some flappers. Updated .travis.yml to failfast if one of the command in the `script` fails. User `set -e` and `set +e` as recommended in https://github.com/travis-ci/travis-ci/issues/1066 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -23,9 +23,11 @@ before_script:
|
||||
- misspell -error -locale US $EXCLUDE_VENDOR
|
||||
- staticcheck $EXCLUDE_VENDOR
|
||||
script:
|
||||
- set -e
|
||||
- go test -i $EXCLUDE_VENDOR
|
||||
- go test -run=TestNoRace $EXCLUDE_VENDOR
|
||||
- go test -run=TestNoRace --failfast -p=1 $EXCLUDE_VENDOR
|
||||
- if [[ "$TRAVIS_GO_VERSION" =~ 1.12 ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race -p=1 --failfast $EXCLUDE_VENDOR; fi
|
||||
- set +e
|
||||
|
||||
deploy:
|
||||
provider: script
|
||||
|
||||
@@ -1496,22 +1496,32 @@ func TestCrossAccountServiceResponseLeaks(t *testing.T) {
|
||||
|
||||
// Now send some requests..We will not respond.
|
||||
var sb strings.Builder
|
||||
for i := 0; i < 100; i++ {
|
||||
for i := 0; i < 50; i++ {
|
||||
sb.WriteString(fmt.Sprintf("PUB foo REPLY.%d 4\r\nhelp\r\n", i))
|
||||
}
|
||||
go cbar.parseAndFlush([]byte(sb.String()))
|
||||
|
||||
// Make sure requests are processed.
|
||||
_, err := crFoo.ReadString('\n')
|
||||
if err != nil {
|
||||
if _, err := crFoo.ReadString('\n'); err != nil {
|
||||
t.Fatalf("Error reading from client 'bar': %v", err)
|
||||
}
|
||||
|
||||
// We should have leaked response maps.
|
||||
if nr := fooAcc.numServiceRoutes(); nr != 100 {
|
||||
if nr := fooAcc.numServiceRoutes(); nr != 50 {
|
||||
t.Fatalf("Expected response maps to be present, got %d", nr)
|
||||
}
|
||||
|
||||
sb.Reset()
|
||||
for i := 50; i < 100; i++ {
|
||||
sb.WriteString(fmt.Sprintf("PUB foo REPLY.%d 4\r\nhelp\r\n", i))
|
||||
}
|
||||
go cbar.parseAndFlush([]byte(sb.String()))
|
||||
|
||||
// Make sure requests are processed.
|
||||
if _, err := crFoo.ReadString('\n'); err != nil {
|
||||
t.Fatalf("Error reading from client 'bar': %v", err)
|
||||
}
|
||||
|
||||
// They should be gone here eventually.
|
||||
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
||||
if nr := fooAcc.numServiceRoutes(); nr != 0 {
|
||||
|
||||
@@ -280,7 +280,6 @@ type readCache struct {
|
||||
|
||||
// This is for routes and gateways to have their own L1 as well that is account aware.
|
||||
pacache map[string]*perAccountCache
|
||||
losc int64 // last orphan subs check
|
||||
|
||||
// This is for when we deliver messages across a route. We use this structure
|
||||
// to make sure to only send one message and properly scope to queues as needed.
|
||||
@@ -300,13 +299,13 @@ type readCache struct {
|
||||
const (
|
||||
defaultMaxPerAccountCacheSize = 4096
|
||||
defaultPrunePerAccountCacheSize = 256
|
||||
defaultOrphanSubsCheckInterval = int64(5 * 60) //5 min in number of seconds
|
||||
defaultClosedSubsCheckInterval = 5 * time.Minute
|
||||
)
|
||||
|
||||
var (
|
||||
maxPerAccountCacheSize = defaultMaxPerAccountCacheSize
|
||||
prunePerAccountCacheSize = defaultPrunePerAccountCacheSize
|
||||
orphanSubsCheckInterval = defaultOrphanSubsCheckInterval
|
||||
closedSubsCheckInterval = defaultClosedSubsCheckInterval
|
||||
)
|
||||
|
||||
// perAccountCache is for L1 semantics for inbound messages from a route or gateway to mimic the performance of clients.
|
||||
@@ -340,27 +339,28 @@ func (c *client) GetTLSConnectionState() *tls.ConnectionState {
|
||||
// FIXME(dlc) - This is getting bloated for normal subs, need
|
||||
// to optionally have an opts section for non-normal stuff.
|
||||
type subscription struct {
|
||||
nm int64 // Will atomically be set to -1 on unsub or connection close
|
||||
client *client
|
||||
im *streamImport // This is for import stream support.
|
||||
shadow []*subscription // This is to track shadowed accounts.
|
||||
subject []byte
|
||||
queue []byte
|
||||
sid []byte
|
||||
nm int64
|
||||
max int64
|
||||
qw int32
|
||||
closed int32
|
||||
}
|
||||
|
||||
// Indicate that this subscription is closed.
|
||||
// This is used in pruning of route and gateway cache items.
|
||||
func (s *subscription) close() {
|
||||
atomic.StoreInt64(&s.nm, -1)
|
||||
atomic.StoreInt32(&s.closed, 1)
|
||||
}
|
||||
|
||||
// Return true if this subscription was unsubscribed
|
||||
// or its connection has been closed.
|
||||
func (s *subscription) isClosed() bool {
|
||||
return atomic.LoadInt64(&s.nm) == -1
|
||||
return atomic.LoadInt32(&s.closed) == 1
|
||||
}
|
||||
|
||||
type clientOpts struct {
|
||||
@@ -757,7 +757,6 @@ func (c *client) readLoop() {
|
||||
nc := c.nc
|
||||
s := c.srv
|
||||
c.in.rsz = startBufSize
|
||||
c.in.losc = time.Now().Unix()
|
||||
// Snapshot max control line since currently can not be changed on reload and we
|
||||
// were checking it on each call to parse. If this changes and we allow MaxControlLine
|
||||
// to be reloaded without restart, this code will need to change.
|
||||
@@ -768,6 +767,10 @@ func (c *client) readLoop() {
|
||||
}
|
||||
}
|
||||
defer s.grWG.Done()
|
||||
// Check the per-account-cache for closed subscriptions
|
||||
cpacc := c.kind == ROUTER || c.kind == GATEWAY
|
||||
// Last per-account-cache check for closed subscriptions
|
||||
lpacc := time.Now()
|
||||
c.mu.Unlock()
|
||||
|
||||
if nc == nil {
|
||||
@@ -875,6 +878,11 @@ func (c *client) readLoop() {
|
||||
c.closeConnection(closedStateForErr(err))
|
||||
return
|
||||
}
|
||||
|
||||
if cpacc && start.Sub(lpacc) >= closedSubsCheckInterval {
|
||||
c.pruneClosedSubFromPerAccountCache()
|
||||
lpacc = time.Now()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2107,6 +2115,14 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// This is set under the client lock using atomic because it can be
|
||||
// checked with atomic without the client lock. Here, we don't need
|
||||
// the atomic operation since we are under the lock.
|
||||
if sub.closed == 1 {
|
||||
client.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
srv := client.srv
|
||||
|
||||
sub.nm++
|
||||
@@ -3241,35 +3257,33 @@ func (c *client) Account() *Account {
|
||||
// prunePerAccountCache will prune off a random number of cache entries.
|
||||
func (c *client) prunePerAccountCache() {
|
||||
n := 0
|
||||
now := time.Now().Unix()
|
||||
if now-c.in.losc >= orphanSubsCheckInterval {
|
||||
for cacheKey, pac := range c.in.pacache {
|
||||
for _, sub := range pac.results.psubs {
|
||||
for cacheKey := range c.in.pacache {
|
||||
delete(c.in.pacache, cacheKey)
|
||||
if n++; n > prunePerAccountCacheSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pruneClosedSubFromPerAccountCache remove entries that contain subscriptions
|
||||
// that have been closed.
|
||||
func (c *client) pruneClosedSubFromPerAccountCache() {
|
||||
for cacheKey, pac := range c.in.pacache {
|
||||
for _, sub := range pac.results.psubs {
|
||||
if sub.isClosed() {
|
||||
goto REMOVE
|
||||
}
|
||||
}
|
||||
for _, qsub := range pac.results.qsubs {
|
||||
for _, sub := range qsub {
|
||||
if sub.isClosed() {
|
||||
goto REMOVE
|
||||
}
|
||||
}
|
||||
for _, qsub := range pac.results.qsubs {
|
||||
for _, sub := range qsub {
|
||||
if sub.isClosed() {
|
||||
goto REMOVE
|
||||
}
|
||||
}
|
||||
}
|
||||
continue
|
||||
REMOVE:
|
||||
delete(c.in.pacache, cacheKey)
|
||||
n++
|
||||
}
|
||||
c.in.losc = now
|
||||
}
|
||||
if n < prunePerAccountCacheSize {
|
||||
for cacheKey := range c.in.pacache {
|
||||
delete(c.in.pacache, cacheKey)
|
||||
if n++; n > prunePerAccountCacheSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
continue
|
||||
REMOVE:
|
||||
delete(c.in.pacache, cacheKey)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4229,7 +4229,21 @@ func TestGatewayServiceExportWithWildcards(t *testing.T) {
|
||||
setAccountUserPassInOptions(oa2, "$foo", "clientA", "password")
|
||||
setAccountUserPassInOptions(oa2, "$bar", "yyyyyyy", "password")
|
||||
oa2.gatewaysSolicitDelay = time.Nanosecond // 0 would be default, so nano to connect asap
|
||||
sa2 := runGatewayServer(oa2)
|
||||
var sa2 *Server
|
||||
sb2ID := sb2.ID()
|
||||
for i := 0; i < 10; i++ {
|
||||
sa2 = runGatewayServer(oa2)
|
||||
ogc := sa2.getOutboundGatewayConnection("B")
|
||||
if ogc != nil {
|
||||
ogc.mu.Lock()
|
||||
ok := ogc.opts.Name == sb2ID
|
||||
ogc.mu.Unlock()
|
||||
if ok {
|
||||
break
|
||||
}
|
||||
}
|
||||
sa2.Shutdown()
|
||||
}
|
||||
defer sa2.Shutdown()
|
||||
|
||||
checkClusterFormed(t, sa1, sa2)
|
||||
|
||||
@@ -612,12 +612,12 @@ func TestNoRaceRouteMemUsage(t *testing.T) {
|
||||
func TestNoRaceRouteCache(t *testing.T) {
|
||||
maxPerAccountCacheSize = 20
|
||||
prunePerAccountCacheSize = 5
|
||||
orphanSubsCheckInterval = 1
|
||||
closedSubsCheckInterval = 250 * time.Millisecond
|
||||
|
||||
defer func() {
|
||||
maxPerAccountCacheSize = defaultMaxPerAccountCacheSize
|
||||
prunePerAccountCacheSize = defaultPrunePerAccountCacheSize
|
||||
orphanSubsCheckInterval = defaultOrphanSubsCheckInterval
|
||||
closedSubsCheckInterval = defaultClosedSubsCheckInterval
|
||||
}()
|
||||
|
||||
for _, test := range []struct {
|
||||
@@ -705,7 +705,7 @@ func TestNoRaceRouteCache(t *testing.T) {
|
||||
checkExpected(t, (maxPerAccountCacheSize+1)-(prunePerAccountCacheSize+1))
|
||||
|
||||
// Wait for more than the orphan check
|
||||
time.Sleep(1500 * time.Millisecond)
|
||||
time.Sleep(2 * closedSubsCheckInterval)
|
||||
|
||||
// Add a new subs up to point where new prune would occur
|
||||
sendReqs(t, requestor, prunePerAccountCacheSize+1, false)
|
||||
@@ -721,7 +721,7 @@ func TestNoRaceRouteCache(t *testing.T) {
|
||||
checkExpected(t, maxPerAccountCacheSize-prunePerAccountCacheSize)
|
||||
|
||||
// Wait for more than the orphan check
|
||||
time.Sleep(1500 * time.Millisecond)
|
||||
time.Sleep(2 * closedSubsCheckInterval)
|
||||
|
||||
// Now create new connection and send prunePerAccountCacheSize+1
|
||||
// and that should cause all subs from previous connection to be
|
||||
|
||||
Reference in New Issue
Block a user