Merge pull request #1177 from nats-io/prevent_ln_update_before_all_subs_sent

Some update to leafnode subscription handling
This commit is contained in:
Ivan Kozlovic
2019-10-30 20:35:07 -06:00
committed by GitHub
4 changed files with 31 additions and 13 deletions

View File

@@ -25,7 +25,7 @@ before_script:
script:
- set -e
- go test -i ./...
- go test -run=TestNoRace --failfast -p=1 ./...
- go test -v -run=TestNoRace --failfast -p=1 ./...
- if [[ "$TRAVIS_GO_VERSION" =~ 1.12 ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race -p=1 --failfast ./...; fi
- set +e

View File

@@ -76,7 +76,7 @@ const (
var readLoopReportThreshold = readLoopReport
// Represent client booleans with a bitmask
type clientFlag byte
type clientFlag uint16
// Some client state represented as flags
const (
@@ -88,6 +88,7 @@ const (
flushOutbound // Marks client as having a flushOutbound call in progress.
noReconnect // Indicate that on close, this connection should not attempt a reconnect
closeConnection // Marks that closeConnection has already been called.
leafAllSubsSent // Indicates that a leaf node has sent the subscription list
)
// set the flag (would be equivalent to set the boolean to true)

View File

@@ -926,13 +926,16 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
c.opts.Pedantic = false
// Create and initialize the smap since we know our bound account now.
s.initLeafNodeSmap(c)
lm := s.initLeafNodeSmap(c)
// We are good to go, send over all the bound account subscriptions.
s.startGoRoutine(func() {
if lm <= 128 {
c.sendAllLeafSubs()
s.grWG.Done()
})
} else {
s.startGoRoutine(func() {
c.sendAllLeafSubs()
s.grWG.Done()
})
}
// Add in the leafnode here since we passed through auth at this point.
s.addLeafNodeConnection(c)
@@ -946,11 +949,11 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
// Snapshot the current subscriptions from the sublist into our smap which
// we will keep updated from now on.
func (s *Server) initLeafNodeSmap(c *client) {
func (s *Server) initLeafNodeSmap(c *client) int {
acc := c.acc
if acc == nil {
c.Debugf("Leafnode does not have an account bound")
return
return 0
}
// Collect all account subs here.
_subs := [32]*subscription{}
@@ -1020,7 +1023,9 @@ func (s *Server) initLeafNodeSmap(c *client) {
if c.leaf.remote == nil {
c.leaf.smap[lds]++
}
lenMap := len(c.leaf.smap)
c.mu.Unlock()
return lenMap
}
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
@@ -1080,7 +1085,7 @@ func (c *client) updateSmap(sub *subscription, delta int32) {
} else {
delete(c.leaf.smap, key)
}
if update {
if update && c.flags.isSet(leafAllSubsSent) {
c.sendLeafNodeSubUpdate(key, n)
}
c.mu.Unlock()
@@ -1119,6 +1124,10 @@ func (c *client) sendAllLeafSubs() {
var b bytes.Buffer
c.mu.Lock()
// Set the flag here before first call to flushOutbound() since that
// releases the lock and so an update could sneak in.
c.flags.set(leafAllSubsSent)
for key, n := range c.leaf.smap {
c.writeLeafSub(&b, key, n)
}

View File

@@ -747,9 +747,17 @@ func TestLeafNodeGatewayInterestPropagation(t *testing.T) {
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
defer lc.Close()
_, leafExpect := setupConn(t, lc)
buf := leafExpect(lsubRe)
if !strings.Contains(string(buf), "foo") {
t.Fatalf("Expected interest for 'foo' as 'LS+ foo\\r\\n', got %q", buf)
var totalBuf []byte
for count := 0; count != 3; {
buf := leafExpect(lsubRe)
totalBuf = append(totalBuf, buf...)
count += len(lsubRe.FindAllSubmatch(buf, -1))
if count > 3 {
t.Fatalf("Expected %v matches, got %v (buf=%s)", 3, count, totalBuf)
}
}
if !strings.Contains(string(totalBuf), "foo") {
t.Fatalf("Expected interest for 'foo' as 'LS+ foo\\r\\n', got %q", totalBuf)
}
}