[FIXED] Unsubscribe may not be propagated through a leaf node

There is a race between the time the processing of a subscription
and the init/send of subscriptions when accepting a leaf node
connection that may cause internally a subscription's subject
to be counted many times, which would then prevent the send of
an LS- when the subscription's interest goes away.

Imagine this sequence of events, each side represents a "thread"
of execution:
```
client readLoop                         leaf node readLoop
----------------------------------------------------------
recv SUB foo 1
sub added to account's sublist

                                         recv CONNECT
                                     auth, added to acc.

updateSmap
smap["foo"]++ -> 1
no LS+ because !allSubsSent

                                         init smap
                                    finds sub in acc sl
                                    smap["foo"]++ -> 2
                                        sends LS+ foo
                                    allSubsSent == true

recv UNSUB 1
updateSmap
smap["foo"]-- -> 1
no LS- because count != 0
----------------------------------------------------------
```
Equivalent result but with slightly diffent execution:
```
client readLoop                         leaf node readLoop
----------------------------------------------------------
recv SUB foo 1
sub added to account's sublist

                                         recv CONNECT
                                     auth, added to acc.

                                         init smap
                                    finds sub in acc sl
                                    smap["foo"]++ -> 1
                                        sends LS+ foo
                                    allSubsSent == true

updateSmap
smap["foo"]++ -> 2
no LS+ because count != 1

recv UNSUB 1
updateSmap
smap["foo"]-- -> 1
no LS- because count != 0
----------------------------------------------------------
```

The approach for the fix is delay the creation of the smap
until we actually initialize the map and send the subs on processing
of the CONNECT.
In the meantime, as soon as the LN connection is registered
and available in updateSmap, we check that smap is nil or
not. If nil, we do nothing.

In "init smap" we keep track of the subscriptions that have been
added to smap. This map will be short lived, just enough to
protect against races above.

In updateSmap, when smap is not nil, we need to checki, if we
are adding, that the subscription has not already been handled.
The tempory subscription map will be ultimately emptied/set to
nil with the use of a timer (if not emptied in place when
processing smap updates).

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2020-06-05 09:31:34 -06:00
parent eed381d617
commit 25bd5ca352
3 changed files with 155 additions and 42 deletions

View File

@@ -112,7 +112,6 @@ const (
writeLoopStarted // Marks that the writeLoop has been started.
skipFlushOnClose // Marks that flushOutbound() should not be called on connection close.
expectConnect // Marks if this connection is expected to send a CONNECT
allSubsSent // Marks this leafnode connection as having sent all subs.
)
// set the flag (would be equivalent to set the boolean to true)

View File

@@ -61,6 +61,15 @@ type leaf struct {
// isSpoke tells us what role we are playing.
// Used when we receive a connection but otherside tells us they are a hub.
isSpoke bool
// This map will contain all the subscriptions that have been added to the smap
// during initLeafNodeSmapAndSendSubs. It is short lived and is there to avoid
// race between processing of a sub where sub is added to account sublist but
// updateSmap has not be called on that "thread", while in the LN readloop,
// when processing CONNECT, initLeafNodeSmapAndSendSubs is invoked and add
// this subscription to smap. When processing of the sub then calls updateSmap,
// we would add it a second time in the smap causing later unsub to suppress the LS-.
tsub map[*subscription]struct{}
tsubt *time.Timer
}
// Used for remote (solicited) leafnodes.
@@ -589,7 +598,8 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
now := time.Now()
c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now}
c.leaf = &leaf{smap: map[string]int32{}}
// Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs
c.leaf = &leaf{}
// Determines if we are soliciting the connection or not.
var solicited bool
@@ -821,8 +831,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
// Make sure we register with the account here.
c.registerWithAccount(c.acc)
s.addLeafNodeConnection(c)
s.initLeafNodeSmap(c)
c.sendAllLeafSubs()
s.initLeafNodeSmapAndSendSubs(c)
if sendSysConnectEvent {
s.sendLeafNodeConnect(c.acc)
}
@@ -969,6 +978,10 @@ func (s *Server) addLeafNodeConnection(c *client) {
func (s *Server) removeLeafNodeConnection(c *client) {
c.mu.Lock()
cid := c.cid
if c.leaf != nil && c.leaf.tsubt != nil {
c.leaf.tsubt.Stop()
c.leaf.tsubt = nil
}
c.mu.Unlock()
s.mu.Lock()
delete(s.leafs, cid)
@@ -1028,16 +1041,8 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
}
// Create and initialize the smap since we know our bound account now.
lm := s.initLeafNodeSmap(c)
// We are good to go, send over all the bound account subscriptions.
if lm <= 128 {
c.sendAllLeafSubs()
} else {
s.startGoRoutine(func() {
c.sendAllLeafSubs()
s.grWG.Done()
})
}
// This will send all registered subs too.
s.initLeafNodeSmapAndSendSubs(c)
// Add in the leafnode here since we passed through auth at this point.
s.addLeafNodeConnection(c)
@@ -1051,11 +1056,12 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
// Snapshot the current subscriptions from the sublist into our smap which
// we will keep updated from now on.
func (s *Server) initLeafNodeSmap(c *client) int {
// Also send the registered subscriptions.
func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
acc := c.acc
if acc == nil {
c.Debugf("Leafnode does not have an account bound")
return 0
return
}
// Collect all account subs here.
_subs := [32]*subscription{}
@@ -1113,10 +1119,15 @@ func (s *Server) initLeafNodeSmap(c *client) int {
// Now walk the results and add them to our smap
c.mu.Lock()
c.leaf.smap = make(map[string]int32)
for _, sub := range subs {
// We ignore ourselves here.
if c != sub.client {
c.leaf.smap[keyFromSub(sub)]++
if c.leaf.tsub == nil {
c.leaf.tsub = make(map[*subscription]struct{})
}
c.leaf.tsub[sub] = struct{}{}
}
}
// FIXME(dlc) - We need to update appropriately on an account claims update.
@@ -1140,10 +1151,28 @@ func (s *Server) initLeafNodeSmap(c *client) int {
wcsub := append(siReply, '>')
c.leaf.smap[string(wcsub)]++
}
lenMap := len(c.leaf.smap)
// Queue all protocols. There is no max pending limit for LN connection,
// so we don't need chunking. The writes will happen from the writeLoop.
var b bytes.Buffer
for key, n := range c.leaf.smap {
c.writeLeafSub(&b, key, n)
}
if b.Len() > 0 {
c.queueOutbound(b.Bytes())
c.flushSignal()
}
if c.leaf.tsub != nil {
// Clear the tsub map after 5 seconds.
c.leaf.tsubt = time.AfterFunc(5*time.Second, func() {
c.mu.Lock()
if c.leaf != nil {
c.leaf.tsub = nil
c.leaf.tsubt = nil
}
c.mu.Unlock()
})
}
c.mu.Unlock()
return lenMap
}
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
@@ -1186,6 +1215,10 @@ func (c *client) updateSmap(sub *subscription, delta int32) {
key := keyFromSub(sub)
c.mu.Lock()
if c.leaf.smap == nil {
c.mu.Unlock()
return
}
// If we are solicited make sure this is a local client or a non-solicited leaf node
skind := sub.client.kind
@@ -1195,6 +1228,20 @@ func (c *client) updateSmap(sub *subscription, delta int32) {
return
}
// For additions, check if that sub has just been processed during initLeafNodeSmapAndSendSubs
if delta > 0 && c.leaf.tsub != nil {
if _, present := c.leaf.tsub[sub]; present {
delete(c.leaf.tsub, sub)
if len(c.leaf.tsub) == 0 {
c.leaf.tsub = nil
c.leaf.tsubt.Stop()
c.leaf.tsubt = nil
}
c.mu.Unlock()
return
}
}
n := c.leaf.smap[key]
// We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
update := sub.queue != nil || n == 0 || n+delta <= 0
@@ -1204,8 +1251,7 @@ func (c *client) updateSmap(sub *subscription, delta int32) {
} else {
delete(c.leaf.smap, key)
}
// Don't send in front of all subs.
if update && c.flags.isSet(allSubsSent) {
if update {
c.sendLeafNodeSubUpdate(key, n)
}
c.mu.Unlock()
@@ -1237,25 +1283,6 @@ func keyFromSub(sub *subscription) string {
return string(key)
}
// Send all subscriptions for this account that include local
// and possibly all other remote subscriptions.
func (c *client) sendAllLeafSubs() {
// Hold all at once for now.
var b bytes.Buffer
c.mu.Lock()
for key, n := range c.leaf.smap {
c.writeLeafSub(&b, key, n)
}
buf := b.Bytes()
if len(buf) > 0 {
c.queueOutbound(buf)
c.flushSignal()
}
c.flags.set(allSubsSent)
c.mu.Unlock()
}
// Lock should be held.
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
if key == "" {

View File

@@ -37,6 +37,8 @@ import (
)
// IMPORTANT: Tests in this file are not executed when running with the -race flag.
// The test name should be prefixed with TestNoRace so we can run only
// those tests: go test -run=TestNoRace ...
func TestNoRaceRouteSendSubs(t *testing.T) {
template := `
@@ -340,7 +342,7 @@ func TestNoRaceLargeClusterMem(t *testing.T) {
// Make sure we have the correct remote state when dealing with queue subscribers
// across many client connections.
func TestQueueSubWeightOrderMultipleConnections(t *testing.T) {
func TestNoRaceQueueSubWeightOrderMultipleConnections(t *testing.T) {
opts, err := server.ProcessConfigFile("./configs/new_cluster.conf")
if err != nil {
t.Fatalf("Error processing config file: %v", err)
@@ -547,7 +549,7 @@ func TestNoRaceClusterLeaksSubscriptions(t *testing.T) {
})
}
func TestJetStreamWorkQueueLoadBalance(t *testing.T) {
func TestNoRaceJetStreamWorkQueueLoadBalance(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
@@ -631,3 +633,88 @@ func TestJetStreamWorkQueueLoadBalance(t *testing.T) {
}
}
}
func TestNoRaceLeafNodeSmapUpdate(t *testing.T) {
s, opts := runLeafServer()
defer s.Shutdown()
// Create a client on leaf server
c := createClientConn(t, opts.Host, opts.Port)
defer c.Close()
csend, cexpect := setupConn(t, c)
numSubs := make(chan int, 1)
doneCh := make(chan struct{}, 1)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for i := 1; ; i++ {
csend(fmt.Sprintf("SUB foo.%d %d\r\n", i, i))
select {
case <-doneCh:
numSubs <- i
return
default:
}
}
}()
time.Sleep(5 * time.Millisecond)
// Create leaf node
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
defer lc.Close()
setupConn(t, lc)
checkLeafNodeConnected(t, s)
close(doneCh)
ns := <-numSubs
csend("PING\r\n")
cexpect(pongRe)
wg.Wait()
// Make sure we receive as many LS+ protocols (since all subs are unique).
// But we also have to count for LDS subject.
// There may be so many protocols and partials, that expectNumberOfProtos may
// not work. Do a manual search here.
checkLS := func(proto string, expected int) {
t.Helper()
p := []byte(proto)
cur := 0
buf := make([]byte, 32768)
for ls := 0; ls < expected; {
lc.SetReadDeadline(time.Now().Add(2 * time.Second))
n, err := lc.Read(buf)
lc.SetReadDeadline(time.Time{})
if err == nil && n > 0 {
for i := 0; i < n; i++ {
if buf[i] == p[cur] {
cur++
if cur == len(p) {
ls++
cur = 0
}
} else {
cur = 0
}
}
}
if err != nil || ls > expected {
t.Fatalf("Expected %v %sgot %v, err: %v", expected, proto, ls, err)
}
}
}
checkLS("LS+ ", ns+1)
// Now unsub all those subs...
for i := 1; i <= ns; i++ {
csend(fmt.Sprintf("UNSUB %d\r\n", i))
}
csend("PING\r\n")
cexpect(pongRe)
// Expect that many LS-
checkLS("LS- ", ns)
}