diff --git a/server/leafnode.go b/server/leafnode.go index a448c331..842e5a4c 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -508,13 +508,14 @@ func (s *Server) startLeafNodeAcceptLoop() { var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}[^\n]*[-]{3,}\n)(.+)(?:\n\s*[-]{3,}[^\n]*[-]{3,}\n))`) // Lock should be held entering here. -func (c *client) sendLeafConnect(clusterName string, tlsRequired bool) error { +func (c *client) sendLeafConnect(clusterName string, tlsRequired, headers bool) error { // We support basic user/pass and operator based user JWT with signatures. cinfo := leafConnectInfo{ TLS: tlsRequired, Name: c.srv.info.ID, Hub: c.leaf.remote.Hub, Cluster: clusterName, + Headers: headers, } // Check for credentials first, that will take precedence.. @@ -691,6 +692,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client { s.generateNonce(nonce[:]) } clusterName := s.info.Cluster + headers := s.supportsHeaders() s.mu.Unlock() // Grab lock @@ -825,7 +827,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client { } } - if err := c.sendLeafConnect(clusterName, tlsRequired); err != nil { + if err := c.sendLeafConnect(clusterName, tlsRequired, headers); err != nil { c.mu.Unlock() c.closeConnection(ProtocolViolation) return nil @@ -888,7 +890,6 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client { // Leaf nodes will always require a CONNECT to let us know // when we are properly bound to an account. - // The connection may have been closed c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout)) } @@ -1161,6 +1162,7 @@ type leafConnectInfo struct { Name string `json:"name,omitempty"` Hub bool `json:"is_hub,omitempty"` Cluster string `json:"cluster,omitempty"` + Headers bool `json:"headers,omitempty"` // Just used to detect wrong connection attempts. Gateway string `json:"gateway,omitempty"` @@ -1194,11 +1196,18 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro return ErrWrongGateway } + // Check if this server supports headers. + supportHeaders := c.srv.supportsHeaders() + c.mu.Lock() // Leaf Nodes do not do echo or verbose or pedantic. c.opts.Verbose = false c.opts.Echo = false c.opts.Pedantic = false + // This inbound connection will be marked as supporting headers if this server + // support headers and the remote has sent in the CONNECT protocol that it does + // support headers too. + c.headers = supportHeaders && proto.Headers // If the other side has declared itself a hub, so we will take on the spoke role. if proto.Hub { diff --git a/test/leafnode_test.go b/test/leafnode_test.go index c4d91486..25d974ea 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -4240,3 +4240,61 @@ func TestLeafNodeStreamAndShadowSubs(t *testing.T) { t.Fatalf("Did not receive message: %v", err) } } + +func TestLeafnodeHeaders(t *testing.T) { + srv, opts := runLeafServer() + defer srv.Shutdown() + leaf, _ := runSolicitLeafServer(opts) + defer leaf.Shutdown() + + snc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf(err.Error()) + } + defer snc.Close() + ssub, err := snc.SubscribeSync("test") + if err != nil { + t.Fatalf("subscribe failed: %s", err) + } + + lnc, err := nats.Connect(leaf.ClientURL()) + if err != nil { + t.Fatalf(err.Error()) + } + defer lnc.Close() + lsub, err := lnc.SubscribeSync("test") + if err != nil { + t.Fatalf("subscribe failed: %s", err) + } + lnc.Flush() + + checkLeafNodeConnected(t, srv) + checkLeafNodeConnected(t, leaf) + checkSubInterest(t, srv, "$G", "test", time.Second) + + msg := nats.NewMsg("test") + msg.Header.Add("Test", "Header") + if len(msg.Header) == 0 { + t.Fatalf("msg header is empty") + } + err = snc.PublishMsg(msg) + if err != nil { + t.Fatalf(err.Error()) + } + + smsg, err := ssub.NextMsg(time.Second) + if err != nil { + t.Fatalf("next failed: %s", err) + } + if len(smsg.Header) == 0 { + t.Fatalf("server msgs header is empty") + } + + lmsg, err := lsub.NextMsg(time.Second) + if err != nil { + t.Fatalf("next failed: %s", err) + } + if len(lmsg.Header) == 0 { + t.Fatalf("leaf msg header is empty") + } +}