diff --git a/.travis.yml b/.travis.yml index dfe32be5..de2ab9ec 100644 --- a/.travis.yml +++ b/.travis.yml @@ -25,7 +25,7 @@ before_script: script: - set -e - go test -i ./... -- go test -run=TestNoRace --failfast -p=1 ./... +- go test -v -run=TestNoRace --failfast -p=1 ./... - if [[ "$TRAVIS_GO_VERSION" =~ 1.12 ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race -p=1 --failfast ./...; fi - set +e diff --git a/server/client.go b/server/client.go index c1c76d0d..ac724e13 100644 --- a/server/client.go +++ b/server/client.go @@ -76,7 +76,7 @@ const ( var readLoopReportThreshold = readLoopReport // Represent client booleans with a bitmask -type clientFlag byte +type clientFlag uint16 // Some client state represented as flags const ( @@ -88,6 +88,7 @@ const ( flushOutbound // Marks client as having a flushOutbound call in progress. noReconnect // Indicate that on close, this connection should not attempt a reconnect closeConnection // Marks that closeConnection has already been called. + leafAllSubsSent // Indicates that a leaf node has sent the subscription list ) // set the flag (would be equivalent to set the boolean to true) diff --git a/server/leafnode.go b/server/leafnode.go index 0de861fa..066ddc1f 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -926,13 +926,16 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro c.opts.Pedantic = false // Create and initialize the smap since we know our bound account now. - s.initLeafNodeSmap(c) - + lm := s.initLeafNodeSmap(c) // We are good to go, send over all the bound account subscriptions. - s.startGoRoutine(func() { + if lm <= 128 { c.sendAllLeafSubs() - s.grWG.Done() - }) + } else { + s.startGoRoutine(func() { + c.sendAllLeafSubs() + s.grWG.Done() + }) + } // Add in the leafnode here since we passed through auth at this point. s.addLeafNodeConnection(c) @@ -946,11 +949,11 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro // Snapshot the current subscriptions from the sublist into our smap which // we will keep updated from now on. -func (s *Server) initLeafNodeSmap(c *client) { +func (s *Server) initLeafNodeSmap(c *client) int { acc := c.acc if acc == nil { c.Debugf("Leafnode does not have an account bound") - return + return 0 } // Collect all account subs here. _subs := [32]*subscription{} @@ -1020,7 +1023,9 @@ func (s *Server) initLeafNodeSmap(c *client) { if c.leaf.remote == nil { c.leaf.smap[lds]++ } + lenMap := len(c.leaf.smap) c.mu.Unlock() + return lenMap } // updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-. @@ -1080,7 +1085,7 @@ func (c *client) updateSmap(sub *subscription, delta int32) { } else { delete(c.leaf.smap, key) } - if update { + if update && c.flags.isSet(leafAllSubsSent) { c.sendLeafNodeSubUpdate(key, n) } c.mu.Unlock() @@ -1119,6 +1124,10 @@ func (c *client) sendAllLeafSubs() { var b bytes.Buffer c.mu.Lock() + // Set the flag here before first call to flushOutbound() since that + // releases the lock and so an update could sneak in. + c.flags.set(leafAllSubsSent) + for key, n := range c.leaf.smap { c.writeLeafSub(&b, key, n) } diff --git a/test/leafnode_test.go b/test/leafnode_test.go index 853b96be..d9d60fd7 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -747,9 +747,17 @@ func TestLeafNodeGatewayInterestPropagation(t *testing.T) { lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port) defer lc.Close() _, leafExpect := setupConn(t, lc) - buf := leafExpect(lsubRe) - if !strings.Contains(string(buf), "foo") { - t.Fatalf("Expected interest for 'foo' as 'LS+ foo\\r\\n', got %q", buf) + var totalBuf []byte + for count := 0; count != 3; { + buf := leafExpect(lsubRe) + totalBuf = append(totalBuf, buf...) + count += len(lsubRe.FindAllSubmatch(buf, -1)) + if count > 3 { + t.Fatalf("Expected %v matches, got %v (buf=%s)", 3, count, totalBuf) + } + } + if !strings.Contains(string(totalBuf), "foo") { + t.Fatalf("Expected interest for 'foo' as 'LS+ foo\\r\\n', got %q", totalBuf) } }