diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index aaf395a4..74840f2d 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3999,10 +3999,7 @@ func TestJetStreamClusterLeafnodePlusDaisyChainSetup(t *testing.T) { server_name: %s jetstream: {max_mem_store: 256MB, max_file_store: 2GB, domain: CLOUD, store_dir: '%s'} - leaf { - listen: 127.0.0.1:-1 - compression: off - } + leaf { listen: 127.0.0.1:-1 } cluster { name: %s @@ -4059,7 +4056,7 @@ func TestJetStreamClusterLeafnodePlusDaisyChainSetup(t *testing.T) { var leafFrag = ` leaf { listen: 127.0.0.1:-1 - remotes [ { urls: [ %s ], account: "T", compression:off }, { urls: [ %s ], account: "F", compression: off } ] + remotes [ { urls: [ %s ], account: "T" }, { urls: [ %s ], account: "F" } ] }` genLeafTmpl := func(tmpl string, c *cluster) string { diff --git a/server/leafnode.go b/server/leafnode.go index 5a60c433..35cb186a 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -696,6 +696,8 @@ func (s *Server) startLeafNodeAcceptLoop() { tlsRequired := opts.LeafNode.TLSConfig != nil tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert + // Do not set compression in this Info object, it would possibly cause + // issues when sending asynchronous INFO to the remote. info := Info{ ID: s.info.ID, Name: s.info.Name, @@ -712,11 +714,6 @@ func (s *Server) startLeafNodeAcceptLoop() { Proto: 1, // Fixed for now. InfoOnConnect: true, } - // For tests that want to simulate old servers, do not set the compression - // on the INFO protocol if configured with CompressionNotSupported. - if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported { - info.Compression = cm - } // If we have selected a random port... if port == 0 { // Write resolved port back to options. @@ -987,7 +984,11 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf // Grab server variables s.mu.Lock() info = s.copyLeafNodeInfo() - info.Compression = opts.LeafNode.Compression.Mode + // For tests that want to simulate old servers, do not set the compression + // on the INFO protocol if configured with CompressionNotSupported. + if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported { + info.Compression = cm + } s.generateNonce(nonce[:]) s.mu.Unlock() } @@ -1201,14 +1202,21 @@ func (c *client) processLeafnodeInfo(info *Info) { c.leaf.compression = CompressionOff } } + // Accepting side does not normally process an INFO protocol during + // initial connection handshake. So we keep it consistent by returning + // if we are not soliciting. + if !didSolicit { + // If we had created the ping timer instead of the auth timer, we will + // clear the ping timer and set the auth timer now that the compression + // negotiation is done. + if info.Compression != _EMPTY_ && c.ping.tmr != nil { + clearTimer(&c.ping.tmr) + c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout)) + } + c.mu.Unlock() + return + } // Fall through and process the INFO protocol as usual. - } else if firstINFO && !didSolicit && needsCompression(opts.LeafNode.Compression.Mode) { - // We used the ping timer instead of auth timer when accepting a remote - // connection so that we can exchange INFO protocols and not have the - // parser return a protocol violation. Now that the negotiation is over - // stop the ping timer and set the auth timer. - clearTimer(&c.ping.tmr) - c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout)) } // Note: For now, only the initial INFO has a nonce. We @@ -2859,9 +2867,6 @@ func (s *Server) leafNodeResumeConnectProcess(c *client) { // Spin up the write loop. s.startGoRoutine(func() { c.writeLoop() }) - // In case there was compression negotiation, the timer could have been - // already created. Destroy and recreate with different callback. - clearTimer(&c.ping.tmr) // timeout leafNodeFinishConnectProcess c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() { c.mu.Lock() diff --git a/server/leafnode_test.go b/server/leafnode_test.go index e1da45fe..24977604 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -5880,3 +5880,72 @@ func TestLeafNodeCompressionWithWSGetNeedsData(t *testing.T) { require_True(t, len(msg.Data) == 156) require_Equal(t, string(msg.Data), payload) } + +func TestLeafNodeCompressionAuthTimeout(t *testing.T) { + hconf := createConfFile(t, []byte(` + port: -1 + server_name: "hub" + leafnodes { + port: -1 + authorization { + timeout: 0.75 + } + } + `)) + sh, oh := RunServerWithConfig(hconf) + defer sh.Shutdown() + + sconfTmpl := ` + port: -1 + server_name: "%s" + cluster { + port: -1 + name: "spoke" + %s + } + leafnodes { + port: -1 + remotes [ + { url: "nats://127.0.0.1:%d" } + ] + } + ` + s1conf := createConfFile(t, []byte(fmt.Sprintf(sconfTmpl, "SP1", _EMPTY_, oh.LeafNode.Port))) + s1, o1 := RunServerWithConfig(s1conf) + defer s1.Shutdown() + + s2conf := createConfFile(t, []byte(fmt.Sprintf(sconfTmpl, "SP2", fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o1.Cluster.Port), oh.LeafNode.Port))) + s2, _ := RunServerWithConfig(s2conf) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + checkLeafNodeConnected(t, s1) + checkLeafNodeConnected(t, s2) + + getCID := func(s *Server) uint64 { + s.mu.RLock() + defer s.mu.RUnlock() + var cid uint64 + for _, l := range s.leafs { + l.mu.Lock() + cid = l.cid + l.mu.Unlock() + } + return cid + } + leaf1 := getCID(s1) + leaf2 := getCID(s2) + + // Wait for more than auth timeout + time.Sleep(time.Second) + + checkLeafNodeConnected(t, s1) + checkLeafNodeConnected(t, s2) + if l1 := getCID(s1); l1 != leaf1 { + t.Fatalf("Leaf connection first connection had CID %v, now %v", leaf1, l1) + } + if l2 := getCID(s2); l2 != leaf2 { + t.Fatalf("Leaf connection first connection had CID %v, now %v", leaf2, l2) + } +}