mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fixed headers support for inbound leafnode connection
The server that solicits a LeafNode connection does not send an INFO, so the accepting side had no way to know if the remote supports headers or not. The solicit side will now send the headers support capability in the CONNECT protocol so that the receiving side can mark the inbound connection with headers support based on that and its own support for headers. Resolves #1781 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -508,13 +508,14 @@ func (s *Server) startLeafNodeAcceptLoop() {
|
||||
var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}[^\n]*[-]{3,}\n)(.+)(?:\n\s*[-]{3,}[^\n]*[-]{3,}\n))`)
|
||||
|
||||
// Lock should be held entering here.
|
||||
func (c *client) sendLeafConnect(clusterName string, tlsRequired bool) error {
|
||||
func (c *client) sendLeafConnect(clusterName string, tlsRequired, headers bool) error {
|
||||
// We support basic user/pass and operator based user JWT with signatures.
|
||||
cinfo := leafConnectInfo{
|
||||
TLS: tlsRequired,
|
||||
Name: c.srv.info.ID,
|
||||
Hub: c.leaf.remote.Hub,
|
||||
Cluster: clusterName,
|
||||
Headers: headers,
|
||||
}
|
||||
|
||||
// Check for credentials first, that will take precedence..
|
||||
@@ -691,6 +692,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
|
||||
s.generateNonce(nonce[:])
|
||||
}
|
||||
clusterName := s.info.Cluster
|
||||
headers := s.supportsHeaders()
|
||||
s.mu.Unlock()
|
||||
|
||||
// Grab lock
|
||||
@@ -825,7 +827,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.sendLeafConnect(clusterName, tlsRequired); err != nil {
|
||||
if err := c.sendLeafConnect(clusterName, tlsRequired, headers); err != nil {
|
||||
c.mu.Unlock()
|
||||
c.closeConnection(ProtocolViolation)
|
||||
return nil
|
||||
@@ -888,7 +890,6 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
|
||||
|
||||
// Leaf nodes will always require a CONNECT to let us know
|
||||
// when we are properly bound to an account.
|
||||
// The connection may have been closed
|
||||
c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
|
||||
}
|
||||
|
||||
@@ -1161,6 +1162,7 @@ type leafConnectInfo struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
Hub bool `json:"is_hub,omitempty"`
|
||||
Cluster string `json:"cluster,omitempty"`
|
||||
Headers bool `json:"headers,omitempty"`
|
||||
|
||||
// Just used to detect wrong connection attempts.
|
||||
Gateway string `json:"gateway,omitempty"`
|
||||
@@ -1194,11 +1196,18 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
|
||||
return ErrWrongGateway
|
||||
}
|
||||
|
||||
// Check if this server supports headers.
|
||||
supportHeaders := c.srv.supportsHeaders()
|
||||
|
||||
c.mu.Lock()
|
||||
// Leaf Nodes do not do echo or verbose or pedantic.
|
||||
c.opts.Verbose = false
|
||||
c.opts.Echo = false
|
||||
c.opts.Pedantic = false
|
||||
// This inbound connection will be marked as supporting headers if this server
|
||||
// support headers and the remote has sent in the CONNECT protocol that it does
|
||||
// support headers too.
|
||||
c.headers = supportHeaders && proto.Headers
|
||||
|
||||
// If the other side has declared itself a hub, so we will take on the spoke role.
|
||||
if proto.Hub {
|
||||
|
||||
@@ -4240,3 +4240,61 @@ func TestLeafNodeStreamAndShadowSubs(t *testing.T) {
|
||||
t.Fatalf("Did not receive message: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeafnodeHeaders(t *testing.T) {
|
||||
srv, opts := runLeafServer()
|
||||
defer srv.Shutdown()
|
||||
leaf, _ := runSolicitLeafServer(opts)
|
||||
defer leaf.Shutdown()
|
||||
|
||||
snc, err := nats.Connect(srv.ClientURL())
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
defer snc.Close()
|
||||
ssub, err := snc.SubscribeSync("test")
|
||||
if err != nil {
|
||||
t.Fatalf("subscribe failed: %s", err)
|
||||
}
|
||||
|
||||
lnc, err := nats.Connect(leaf.ClientURL())
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
defer lnc.Close()
|
||||
lsub, err := lnc.SubscribeSync("test")
|
||||
if err != nil {
|
||||
t.Fatalf("subscribe failed: %s", err)
|
||||
}
|
||||
lnc.Flush()
|
||||
|
||||
checkLeafNodeConnected(t, srv)
|
||||
checkLeafNodeConnected(t, leaf)
|
||||
checkSubInterest(t, srv, "$G", "test", time.Second)
|
||||
|
||||
msg := nats.NewMsg("test")
|
||||
msg.Header.Add("Test", "Header")
|
||||
if len(msg.Header) == 0 {
|
||||
t.Fatalf("msg header is empty")
|
||||
}
|
||||
err = snc.PublishMsg(msg)
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
smsg, err := ssub.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("next failed: %s", err)
|
||||
}
|
||||
if len(smsg.Header) == 0 {
|
||||
t.Fatalf("server msgs header is empty")
|
||||
}
|
||||
|
||||
lmsg, err := lsub.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("next failed: %s", err)
|
||||
}
|
||||
if len(lmsg.Header) == 0 {
|
||||
t.Fatalf("leaf msg header is empty")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user