Fixed issues with leafnode compression negotiation

When a server would send an asynchronous INFO to a remote server
it would incorrectly contain compression information that could
cause issues with one side thinking that the connection should
be compressed while the other side was not.

It also caused the authentication timer to be incorrectly set
which would cause a disconnect.

Signed-off-by: Ivan Kozlovic <ijkozlovic@gmail.com>
This commit is contained in:
Ivan Kozlovic
2023-06-09 12:08:10 -06:00
parent a1f03513d8
commit 7ff0ea449a
3 changed files with 92 additions and 21 deletions

View File

@@ -3999,10 +3999,7 @@ func TestJetStreamClusterLeafnodePlusDaisyChainSetup(t *testing.T) {
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, domain: CLOUD, store_dir: '%s'}
leaf {
listen: 127.0.0.1:-1
compression: off
}
leaf { listen: 127.0.0.1:-1 }
cluster {
name: %s
@@ -4059,7 +4056,7 @@ func TestJetStreamClusterLeafnodePlusDaisyChainSetup(t *testing.T) {
var leafFrag = `
leaf {
listen: 127.0.0.1:-1
remotes [ { urls: [ %s ], account: "T", compression:off }, { urls: [ %s ], account: "F", compression: off } ]
remotes [ { urls: [ %s ], account: "T" }, { urls: [ %s ], account: "F" } ]
}`
genLeafTmpl := func(tmpl string, c *cluster) string {

View File

@@ -696,6 +696,8 @@ func (s *Server) startLeafNodeAcceptLoop() {
tlsRequired := opts.LeafNode.TLSConfig != nil
tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
// Do not set compression in this Info object, it would possibly cause
// issues when sending asynchronous INFO to the remote.
info := Info{
ID: s.info.ID,
Name: s.info.Name,
@@ -712,11 +714,6 @@ func (s *Server) startLeafNodeAcceptLoop() {
Proto: 1, // Fixed for now.
InfoOnConnect: true,
}
// For tests that want to simulate old servers, do not set the compression
// on the INFO protocol if configured with CompressionNotSupported.
if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
info.Compression = cm
}
// If we have selected a random port...
if port == 0 {
// Write resolved port back to options.
@@ -987,7 +984,11 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
// Grab server variables
s.mu.Lock()
info = s.copyLeafNodeInfo()
info.Compression = opts.LeafNode.Compression.Mode
// For tests that want to simulate old servers, do not set the compression
// on the INFO protocol if configured with CompressionNotSupported.
if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
info.Compression = cm
}
s.generateNonce(nonce[:])
s.mu.Unlock()
}
@@ -1201,14 +1202,21 @@ func (c *client) processLeafnodeInfo(info *Info) {
c.leaf.compression = CompressionOff
}
}
// Accepting side does not normally process an INFO protocol during
// initial connection handshake. So we keep it consistent by returning
// if we are not soliciting.
if !didSolicit {
// If we had created the ping timer instead of the auth timer, we will
// clear the ping timer and set the auth timer now that the compression
// negotiation is done.
if info.Compression != _EMPTY_ && c.ping.tmr != nil {
clearTimer(&c.ping.tmr)
c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
}
c.mu.Unlock()
return
}
// Fall through and process the INFO protocol as usual.
} else if firstINFO && !didSolicit && needsCompression(opts.LeafNode.Compression.Mode) {
// We used the ping timer instead of auth timer when accepting a remote
// connection so that we can exchange INFO protocols and not have the
// parser return a protocol violation. Now that the negotiation is over
// stop the ping timer and set the auth timer.
clearTimer(&c.ping.tmr)
c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
}
// Note: For now, only the initial INFO has a nonce. We
@@ -2859,9 +2867,6 @@ func (s *Server) leafNodeResumeConnectProcess(c *client) {
// Spin up the write loop.
s.startGoRoutine(func() { c.writeLoop() })
// In case there was compression negotiation, the timer could have been
// already created. Destroy and recreate with different callback.
clearTimer(&c.ping.tmr)
// timeout leafNodeFinishConnectProcess
c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
c.mu.Lock()

View File

@@ -5880,3 +5880,72 @@ func TestLeafNodeCompressionWithWSGetNeedsData(t *testing.T) {
require_True(t, len(msg.Data) == 156)
require_Equal(t, string(msg.Data), payload)
}
func TestLeafNodeCompressionAuthTimeout(t *testing.T) {
hconf := createConfFile(t, []byte(`
port: -1
server_name: "hub"
leafnodes {
port: -1
authorization {
timeout: 0.75
}
}
`))
sh, oh := RunServerWithConfig(hconf)
defer sh.Shutdown()
sconfTmpl := `
port: -1
server_name: "%s"
cluster {
port: -1
name: "spoke"
%s
}
leafnodes {
port: -1
remotes [
{ url: "nats://127.0.0.1:%d" }
]
}
`
s1conf := createConfFile(t, []byte(fmt.Sprintf(sconfTmpl, "SP1", _EMPTY_, oh.LeafNode.Port)))
s1, o1 := RunServerWithConfig(s1conf)
defer s1.Shutdown()
s2conf := createConfFile(t, []byte(fmt.Sprintf(sconfTmpl, "SP2", fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o1.Cluster.Port), oh.LeafNode.Port)))
s2, _ := RunServerWithConfig(s2conf)
defer s2.Shutdown()
checkClusterFormed(t, s1, s2)
checkLeafNodeConnected(t, s1)
checkLeafNodeConnected(t, s2)
getCID := func(s *Server) uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
var cid uint64
for _, l := range s.leafs {
l.mu.Lock()
cid = l.cid
l.mu.Unlock()
}
return cid
}
leaf1 := getCID(s1)
leaf2 := getCID(s2)
// Wait for more than auth timeout
time.Sleep(time.Second)
checkLeafNodeConnected(t, s1)
checkLeafNodeConnected(t, s2)
if l1 := getCID(s1); l1 != leaf1 {
t.Fatalf("Leaf connection first connection had CID %v, now %v", leaf1, l1)
}
if l2 := getCID(s2); l2 != leaf2 {
t.Fatalf("Leaf connection first connection had CID %v, now %v", leaf2, l2)
}
}