From 7ff0ea449a6f368d4df03abfabf44db7b99e00e3 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 9 Jun 2023 12:08:10 -0600 Subject: [PATCH] Fixed issues with leafnode compression negotiation When a server would send an asynchronous INFO to a remote server it would incorrectly contain compression information that could cause issues with one side thinking that the connection should be compressed while the other side was not. It also caused the authentication timer to be incorrectly set which would cause a disconnect. Signed-off-by: Ivan Kozlovic --- server/jetstream_cluster_3_test.go | 7 +-- server/leafnode.go | 37 +++++++++------- server/leafnode_test.go | 69 ++++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+), 21 deletions(-) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index aaf395a4..74840f2d 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3999,10 +3999,7 @@ func TestJetStreamClusterLeafnodePlusDaisyChainSetup(t *testing.T) { server_name: %s jetstream: {max_mem_store: 256MB, max_file_store: 2GB, domain: CLOUD, store_dir: '%s'} - leaf { - listen: 127.0.0.1:-1 - compression: off - } + leaf { listen: 127.0.0.1:-1 } cluster { name: %s @@ -4059,7 +4056,7 @@ func TestJetStreamClusterLeafnodePlusDaisyChainSetup(t *testing.T) { var leafFrag = ` leaf { listen: 127.0.0.1:-1 - remotes [ { urls: [ %s ], account: "T", compression:off }, { urls: [ %s ], account: "F", compression: off } ] + remotes [ { urls: [ %s ], account: "T" }, { urls: [ %s ], account: "F" } ] }` genLeafTmpl := func(tmpl string, c *cluster) string { diff --git a/server/leafnode.go b/server/leafnode.go index 5a60c433..35cb186a 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -696,6 +696,8 @@ func (s *Server) startLeafNodeAcceptLoop() { tlsRequired := opts.LeafNode.TLSConfig != nil tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert + // Do not set compression in this Info object, it would possibly cause + // issues when sending asynchronous INFO to the remote. info := Info{ ID: s.info.ID, Name: s.info.Name, @@ -712,11 +714,6 @@ func (s *Server) startLeafNodeAcceptLoop() { Proto: 1, // Fixed for now. InfoOnConnect: true, } - // For tests that want to simulate old servers, do not set the compression - // on the INFO protocol if configured with CompressionNotSupported. - if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported { - info.Compression = cm - } // If we have selected a random port... if port == 0 { // Write resolved port back to options. @@ -987,7 +984,11 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf // Grab server variables s.mu.Lock() info = s.copyLeafNodeInfo() - info.Compression = opts.LeafNode.Compression.Mode + // For tests that want to simulate old servers, do not set the compression + // on the INFO protocol if configured with CompressionNotSupported. + if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported { + info.Compression = cm + } s.generateNonce(nonce[:]) s.mu.Unlock() } @@ -1201,14 +1202,21 @@ func (c *client) processLeafnodeInfo(info *Info) { c.leaf.compression = CompressionOff } } + // Accepting side does not normally process an INFO protocol during + // initial connection handshake. So we keep it consistent by returning + // if we are not soliciting. + if !didSolicit { + // If we had created the ping timer instead of the auth timer, we will + // clear the ping timer and set the auth timer now that the compression + // negotiation is done. + if info.Compression != _EMPTY_ && c.ping.tmr != nil { + clearTimer(&c.ping.tmr) + c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout)) + } + c.mu.Unlock() + return + } // Fall through and process the INFO protocol as usual. - } else if firstINFO && !didSolicit && needsCompression(opts.LeafNode.Compression.Mode) { - // We used the ping timer instead of auth timer when accepting a remote - // connection so that we can exchange INFO protocols and not have the - // parser return a protocol violation. Now that the negotiation is over - // stop the ping timer and set the auth timer. - clearTimer(&c.ping.tmr) - c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout)) } // Note: For now, only the initial INFO has a nonce. We @@ -2859,9 +2867,6 @@ func (s *Server) leafNodeResumeConnectProcess(c *client) { // Spin up the write loop. s.startGoRoutine(func() { c.writeLoop() }) - // In case there was compression negotiation, the timer could have been - // already created. Destroy and recreate with different callback. - clearTimer(&c.ping.tmr) // timeout leafNodeFinishConnectProcess c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() { c.mu.Lock() diff --git a/server/leafnode_test.go b/server/leafnode_test.go index e1da45fe..24977604 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -5880,3 +5880,72 @@ func TestLeafNodeCompressionWithWSGetNeedsData(t *testing.T) { require_True(t, len(msg.Data) == 156) require_Equal(t, string(msg.Data), payload) } + +func TestLeafNodeCompressionAuthTimeout(t *testing.T) { + hconf := createConfFile(t, []byte(` + port: -1 + server_name: "hub" + leafnodes { + port: -1 + authorization { + timeout: 0.75 + } + } + `)) + sh, oh := RunServerWithConfig(hconf) + defer sh.Shutdown() + + sconfTmpl := ` + port: -1 + server_name: "%s" + cluster { + port: -1 + name: "spoke" + %s + } + leafnodes { + port: -1 + remotes [ + { url: "nats://127.0.0.1:%d" } + ] + } + ` + s1conf := createConfFile(t, []byte(fmt.Sprintf(sconfTmpl, "SP1", _EMPTY_, oh.LeafNode.Port))) + s1, o1 := RunServerWithConfig(s1conf) + defer s1.Shutdown() + + s2conf := createConfFile(t, []byte(fmt.Sprintf(sconfTmpl, "SP2", fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o1.Cluster.Port), oh.LeafNode.Port))) + s2, _ := RunServerWithConfig(s2conf) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + checkLeafNodeConnected(t, s1) + checkLeafNodeConnected(t, s2) + + getCID := func(s *Server) uint64 { + s.mu.RLock() + defer s.mu.RUnlock() + var cid uint64 + for _, l := range s.leafs { + l.mu.Lock() + cid = l.cid + l.mu.Unlock() + } + return cid + } + leaf1 := getCID(s1) + leaf2 := getCID(s2) + + // Wait for more than auth timeout + time.Sleep(time.Second) + + checkLeafNodeConnected(t, s1) + checkLeafNodeConnected(t, s2) + if l1 := getCID(s1); l1 != leaf1 { + t.Fatalf("Leaf connection first connection had CID %v, now %v", leaf1, l1) + } + if l2 := getCID(s2); l2 != leaf2 { + t.Fatalf("Leaf connection first connection had CID %v, now %v", leaf2, l2) + } +}