diff --git a/server/accounts.go b/server/accounts.go index 17664797..052abb2a 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -500,6 +500,18 @@ func (a *Account) SubscriptionInterest(subject string) bool { return interest } +// Interest returns the number of subscriptions for a given subject that match. +func (a *Account) Interest(subject string) int { + var nms int + a.mu.RLock() + if a.sl != nil { + res := a.sl.Match(subject) + nms = len(res.psubs) + len(res.qsubs) + } + a.mu.RUnlock() + return nms +} + // addClient keeps our accounting of local active clients or leafnodes updated. // Returns previous total. func (a *Account) addClient(c *client) int { diff --git a/server/client.go b/server/client.go index f70c2db0..e9e23516 100644 --- a/server/client.go +++ b/server/client.go @@ -402,6 +402,7 @@ type subscription struct { subject []byte queue []byte sid []byte + origin []byte nm int64 max int64 qw int32 @@ -2559,16 +2560,28 @@ func (c *client) checkDenySub(subject string) bool { return false } -// Create a message header for routes or leafnodes. Header aware. +// Create a message header for routes or leafnodes. Header and origin cluster aware. func (c *client) msgHeaderForRouteOrLeaf(subj, reply []byte, rt *routeTarget, acc *Account) []byte { hasHeader := c.pa.hdr > 0 canReceiveHeader := rt.sub.client.headers - kind := rt.sub.client.kind mh := c.msgb[:msgHeadProtoLen] + kind := rt.sub.client.kind + var lnoc bool + if kind == ROUTER { - // Router (and Gateway) nodes are RMSG. Set here since leafnodes may rewrite. - mh[0] = 'R' + // If we are coming from a leaf with an origin cluster we need to handle differently + // if we can. We will send a route based LMSG which has origin cluster and headers + // by default. + if c.kind == LEAF && c.remoteCluster() != _EMPTY_ && rt.sub.client.route.lnoc { + mh[0] = 'L' + mh = append(mh, c.remoteCluster()...) + mh = append(mh, ' ') + lnoc = true + } else { + // Router (and Gateway) nodes are RMSG. Set here since leafnodes may rewrite. + mh[0] = 'R' + } mh = append(mh, acc.Name...) mh = append(mh, ' ') } else { @@ -2595,7 +2608,17 @@ func (c *client) msgHeaderForRouteOrLeaf(subj, reply []byte, rt *routeTarget, ac mh = append(mh, reply...) mh = append(mh, ' ') } - if hasHeader { + + if lnoc { + // leafnode origin LMSG always have a header entry even if zero. + if c.pa.hdr <= 0 { + mh = append(mh, '0') + } else { + mh = append(mh, c.pa.hdb...) + } + mh = append(mh, ' ') + mh = append(mh, c.pa.szb...) + } else if hasHeader { if canReceiveHeader { mh[0] = 'H' mh = append(mh, c.pa.hdb...) @@ -2609,8 +2632,7 @@ func (c *client) msgHeaderForRouteOrLeaf(subj, reply []byte, rt *routeTarget, ac } else { mh = append(mh, c.pa.szb...) } - mh = append(mh, _CRLF_...) - return mh + return append(mh, _CRLF_...) } // Create a message header for clients. Header aware. @@ -3672,6 +3694,25 @@ sendToRoutesOrLeafs: // We have inline structs for memory layout and cache coherency. for i := range c.in.rts { rt := &c.in.rts[i] + // Check if we have an origin cluster set from a leafnode message. + // If so make sure we do not send it back to the same cluster for a different + // leafnode. Cluster wide no echo. + if rt.sub.client.kind == LEAF { + // Check two scenarios. One is inbound from a route (c.pa.origin) + if c.kind == ROUTER && len(c.pa.origin) > 0 { + if string(c.pa.origin) == rt.sub.client.remoteCluster() { + continue + } + } + // The other is leaf to leaf. + if c.kind == LEAF { + src, dest := c.remoteCluster(), rt.sub.client.remoteCluster() + if src != _EMPTY_ && src == dest { + continue + } + } + } + mh := c.msgHeaderForRouteOrLeaf(subject, reply, rt, acc) didDeliver = c.deliverMsg(rt.sub, subject, reply, mh, msg, false) || didDeliver } diff --git a/server/const.go b/server/const.go index 8a5024e4..976b73ac 100644 --- a/server/const.go +++ b/server/const.go @@ -40,7 +40,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.2.0-beta.17" + VERSION = "2.2.0-beta.18" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/leafnode.go b/server/leafnode.go index dedd78a8..69e20a45 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -52,15 +52,17 @@ const leafNodeReconnectAfterPermViolation = 30 * time.Second const leafNodeLoopDetectionSubjectPrefix = "$LDS." type leaf struct { - // Used to suppress sub and unsub interest. Same as routes but our audience - // here is tied to this leaf node. This will hold all subscriptions except this - // leaf nodes. This represents all the interest we want to send to the other side. - smap map[string]int32 // We have any auth stuff here for solicited connections. remote *leafNodeCfg // isSpoke tells us what role we are playing. // Used when we receive a connection but otherside tells us they are a hub. isSpoke bool + // remoteCluster is when we are a hub but the spoke leafnode is part of a cluster. + remoteCluster string + // Used to suppress sub and unsub interest. Same as routes but our audience + // here is tied to this leaf node. This will hold all subscriptions except this + // leaf nodes. This represents all the interest we want to send to the other side. + smap map[string]int32 // This map will contain all the subscriptions that have been added to the smap // during initLeafNodeSmapAndSendSubs. It is short lived and is there to avoid // race between processing of a sub where sub is added to account sublist but @@ -452,12 +454,13 @@ func (s *Server) leafNodeAcceptLoop(ch chan struct{}) { var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}[^\n]*[-]{3,}\n)(.+)(?:\n\s*[-]{3,}[^\n]*[-]{3,}\n))`) // Lock should be held entering here. -func (c *client) sendLeafConnect(tlsRequired bool) { +func (c *client) sendLeafConnect(clusterName string, tlsRequired bool) { // We support basic user/pass and operator based user JWT with signatures. cinfo := leafConnectInfo{ - TLS: tlsRequired, - Name: c.srv.info.ID, - Hub: c.leaf.remote.Hub, + TLS: tlsRequired, + Name: c.srv.info.ID, + Hub: c.leaf.remote.Hub, + Cluster: clusterName, } // Check for credentials first, that will take precedence.. @@ -648,6 +651,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client { if !solicited { s.generateNonce(nonce[:]) } + clusterName := s.info.Cluster s.mu.Unlock() // Grab lock @@ -750,7 +754,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client { c.mu.Lock() } - c.sendLeafConnect(tlsRequired) + c.sendLeafConnect(clusterName, tlsRequired) c.Debugf("Remote leafnode connect msg sent") } else { @@ -1025,14 +1029,16 @@ func (s *Server) removeLeafNodeConnection(c *client) { } type leafConnectInfo struct { - JWT string `json:"jwt,omitempty"` - Sig string `json:"sig,omitempty"` - User string `json:"user,omitempty"` - Pass string `json:"pass,omitempty"` - TLS bool `json:"tls_required"` - Comp bool `json:"compression,omitempty"` - Name string `json:"name,omitempty"` - Hub bool `json:"is_hub,omitempty"` + JWT string `json:"jwt,omitempty"` + Sig string `json:"sig,omitempty"` + User string `json:"user,omitempty"` + Pass string `json:"pass,omitempty"` + TLS bool `json:"tls_required"` + Comp bool `json:"compression,omitempty"` + Name string `json:"name,omitempty"` + Hub bool `json:"is_hub,omitempty"` + Cluster string `json:"cluster,omitempty"` + // Just used to detect wrong connection attempts. Gateway string `json:"gateway,omitempty"` } @@ -1075,6 +1081,11 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro c.leaf.isSpoke = true } + // The soliciting side is part of a cluster. + if proto.Cluster != "" { + c.leaf.remoteCluster = proto.Cluster + } + // If we have permissions bound to this leafnode we need to send then back to the // origin server for local enforcement. s.sendPermsInfo(c) @@ -1093,6 +1104,14 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro return nil } +// Returns the remote cluster name. This is set only once so does not require a lock. +func (c *client) remoteCluster() string { + if c.leaf == nil { + return "" + } + return c.leaf.remoteCluster +} + // Sends back an info block to the soliciting leafnode to let it know about // its permission settings for local enforcement. func (s *Server) sendPermsInfo(c *client) { @@ -1262,6 +1281,10 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) { acc.mu.RUnlock() for _, ln := range leafs { + // Check to make sure this sub does not have an origin cluster than matches the leafnode. + if sub.origin != nil && string(sub.origin) == ln.remoteCluster() { + continue + } ln.updateSmap(sub, delta) } } @@ -1453,6 +1476,11 @@ func (c *client) processLeafSub(argo []byte) (err error) { return nil } + // If we have an origin cluster associated mark that in the sub. + if rc := c.remoteCluster(); rc != _EMPTY_ { + sub.origin = []byte(rc) + } + // Like Routes, we store local subs by account and subject and optionally queue name. // If we have a queue it will have a trailing weight which we do not want. if sub.queue != nil { @@ -1847,3 +1875,16 @@ func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, t c.mu.Unlock() return accName, delay } + +// updatedSolicitedLeafnodes will disconnect any solicited leafnodes such +// that the reconnect will establish the proper origin cluster for the hub. +func (s *Server) updatedSolicitedLeafnodes() { + for _, c := range s.leafs { + c.mu.Lock() + shouldClose := c.leaf != nil && c.leaf.remote != nil + c.mu.Unlock() + if shouldClose { + c.closeConnection(ClusterNameConflict) + } + } +} diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 96bf157a..29177e63 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -787,10 +787,12 @@ func TestLeafNodeLoop(t *testing.T) { func TestLeafNodeLoopFromDAG(t *testing.T) { // We want B & C to point to A, A itself does not point to any other server. + // We need to cancel clustering since now this will suppress on its own. oa := DefaultOptions() oa.ServerName = "A" oa.LeafNode.ReconnectInterval = 10 * time.Millisecond oa.LeafNode.Port = -1 + oa.Cluster = ClusterOpts{} sa := RunServer(oa) defer sa.Shutdown() @@ -802,6 +804,7 @@ func TestLeafNodeLoopFromDAG(t *testing.T) { ob.LeafNode.ReconnectInterval = 10 * time.Millisecond ob.LeafNode.Port = -1 ob.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ua}}} + ob.Cluster = ClusterOpts{} sb := RunServer(ob) defer sb.Shutdown() @@ -816,6 +819,7 @@ func TestLeafNodeLoopFromDAG(t *testing.T) { oc.LeafNode.ReconnectInterval = 10 * time.Millisecond oc.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ua}}, {URLs: []*url.URL{ub}}} oc.LeafNode.connDelay = 100 * time.Millisecond // Allow logger to be attached before connecting. + oc.Cluster = ClusterOpts{} sc := RunServer(oc) lc := &captureErrorLogger{errCh: make(chan string, 10)} @@ -1460,7 +1464,6 @@ func TestLeafNodeTmpClients(t *testing.T) { } func TestLeafNodeTLSVerifyAndMap(t *testing.T) { - accName := "MyAccount" acc := NewAccount(accName) certUserName := "CN=example.com,OU=NATS.io" @@ -1554,3 +1557,95 @@ func TestLeafNodeTLSVerifyAndMap(t *testing.T) { }) } } + +func TestLeafNodeOriginClusterInfo(t *testing.T) { + hopts := DefaultOptions() + hopts.ServerName = "hub" + hopts.LeafNode.Port = -1 + + hub := RunServer(hopts) + defer hub.Shutdown() + + conf := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + leaf { + remotes [ { url: "nats://127.0.0.1:%d" } ] + } + `, hopts.LeafNode.Port))) + + defer os.Remove(conf) + opts, err := ProcessConfigFile(conf) + if err != nil { + t.Fatalf("Error processing config file: %v", err) + } + opts.NoLog, opts.NoSigs = true, true + + s := RunServer(opts) + defer s.Shutdown() + + checkLeafNodeConnected(t, s) + + // Check the info on the leadnode client in the hub. + grabLeaf := func() *client { + var l *client + hub.mu.Lock() + for _, l = range hub.leafs { + break + } + hub.mu.Unlock() + return l + } + + l := grabLeaf() + if rc := l.remoteCluster(); rc != "" { + t.Fatalf("Expected an empty remote cluster, got %q", rc) + } + + s.Shutdown() + + // Now make our leafnode part of a cluster. + conf = createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + leaf { + remotes [ { url: "nats://127.0.0.1:%d" } ] + } + cluster { + name: "abc" + listen: "127.0.0.1:-1" + } + `, hopts.LeafNode.Port))) + + defer os.Remove(conf) + opts, err = ProcessConfigFile(conf) + if err != nil { + t.Fatalf("Error processing config file: %v", err) + } + opts.NoLog, opts.NoSigs = true, true + + s = RunServer(opts) + defer s.Shutdown() + + checkLeafNodeConnected(t, s) + + l = grabLeaf() + if rc := l.remoteCluster(); rc != "abc" { + t.Fatalf("Expected a remote cluster name of \"abc\", got %q", rc) + } + pcid := l.cid + + // Now make sure that if we update our cluster name, simulating the settling + // of dynamic cluster names between competing servers. + s.setClusterName("xyz") + // Make sure we disconnect and reconnect. + checkLeafNodeConnectedCount(t, s, 0) + checkLeafNodeConnected(t, s) + + l = grabLeaf() + if rc := l.remoteCluster(); rc != "xyz" { + t.Fatalf("Expected a remote cluster name of \"xyz\", got %q", rc) + } + // Make sure we reconnected and have a new CID. + if l.cid == pcid { + t.Fatalf("Expected a different id, got the same") + } +} diff --git a/server/monitor.go b/server/monitor.go index c4345ab5..3b3756f4 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1902,6 +1902,11 @@ func (reason ClosedState) String() string { return "Internal Client" case MsgHeaderViolation: return "Message Header Violation" + case NoRespondersRequiresHeaders: + return "No Responders Requires Headers" + case ClusterNameConflict: + return "Cluster Name Conflict" } + return "Unknown State" } diff --git a/server/parser.go b/server/parser.go index 255007a9..4595776b 100644 --- a/server/parser.go +++ b/server/parser.go @@ -20,6 +20,7 @@ import ( type parserState int type parseState struct { state parserState + op byte as int drop int pa pubArg @@ -31,6 +32,7 @@ type parseState struct { type pubArg struct { arg []byte pacache []byte + origin []byte account []byte subject []byte deliver []byte @@ -142,6 +144,7 @@ func (c *client) parse(buf []byte) error { switch c.state { case OP_START: + c.op = b if b != 'C' && b != 'c' { if authSet { goto authErr @@ -170,7 +173,7 @@ func (c *client) parse(buf []byte) error { c.state = OP_R } case 'L', 'l': - if c.kind != LEAF { + if c.kind != LEAF && c.kind != ROUTER { goto parseErr } else { c.state = OP_L @@ -442,7 +445,7 @@ func (c *client) parse(buf []byte) error { c.argBuf, c.msgBuf = nil, nil c.drop, c.as, c.state = 0, i+1, OP_START // Drop all pub args - c.pa.arg, c.pa.pacache, c.pa.account, c.pa.subject = nil, nil, nil, nil + c.pa.arg, c.pa.pacache, c.pa.origin, c.pa.account, c.pa.subject = nil, nil, nil, nil, nil c.pa.reply, c.pa.hdr, c.pa.size, c.pa.szb, c.pa.hdb, c.pa.queues = nil, -1, 0, nil, nil, nil case OP_A: switch b { @@ -579,10 +582,18 @@ func (c *client) parse(buf []byte) error { } _, err = c.processSub(arg, false) case ROUTER: - if trace { - c.traceInOp("RS+", arg) + switch c.op { + case 'R', 'r': + if trace { + c.traceInOp("RS+", arg) + } + err = c.processRemoteSub(arg, false) + case 'L', 'l': + if trace { + c.traceInOp("LS+", arg) + } + err = c.processRemoteSub(arg, true) } - err = c.processRemoteSub(arg) case GATEWAY: if trace { c.traceInOp("RS+", arg) @@ -895,10 +906,18 @@ func (c *client) parse(buf []byte) error { } var err error if c.kind == ROUTER || c.kind == GATEWAY { - if trace { - c.traceInOp("RMSG", arg) + switch c.op { + case 'R', 'r': + if trace { + c.traceInOp("RMSG", arg) + } + err = c.processRoutedMsgArgs(arg) + case 'L', 'l': + if trace { + c.traceInOp("LMSG", arg) + } + err = c.processRoutedOriginClusterMsgArgs(arg) } - err = c.processRoutedMsgArgs(arg) } else if c.kind == LEAF { if trace { c.traceInOp("LMSG", arg) diff --git a/server/route.go b/server/route.go index ff78e214..645adcdc 100644 --- a/server/route.go +++ b/server/route.go @@ -56,6 +56,8 @@ var ( aUnsubBytes = []byte{'A', '-', ' '} rSubBytes = []byte{'R', 'S', '+', ' '} rUnsubBytes = []byte{'R', 'S', '-', ' '} + lSubBytes = []byte{'L', 'S', '+', ' '} + lUnsubBytes = []byte{'L', 'S', '-', ' '} ) // Used by tests @@ -68,6 +70,7 @@ type route struct { remoteName string didSolicit bool retry bool + lnoc bool routeType RouteType url *url.URL authRequired bool @@ -91,6 +94,7 @@ type connectInfo struct { Name string `json:"name"` Cluster string `json:"cluster"` Dynamic bool `json:"cluster_dynamic,omitempty"` + LNOC bool `json:"lnoc,omitempty"` Gateway string `json:"gateway,omitempty"` } @@ -158,6 +162,96 @@ func (c *client) processAccountUnsub(arg []byte) { } } +// Process an inbound LMSG specification from the remote route. This means +// we have an origin cluster and we force header semantics. +func (c *client) processRoutedOriginClusterMsgArgs(arg []byte) error { + // Unroll splitArgs to avoid runtime/heap issues + a := [MAX_HMSG_ARGS + 1][]byte{} + args := a[:0] + start := -1 + for i, b := range arg { + switch b { + case ' ', '\t', '\r', '\n': + if start >= 0 { + args = append(args, arg[start:i]) + start = -1 + } + default: + if start < 0 { + start = i + } + } + } + if start >= 0 { + args = append(args, arg[start:]) + } + + c.pa.arg = arg + switch len(args) { + case 0, 1, 2, 3, 4: + return fmt.Errorf("processRoutedOriginClusterMsgArgs Parse Error: '%s'", args) + case 5: + c.pa.reply = nil + c.pa.queues = nil + c.pa.hdb = args[3] + c.pa.hdr = parseSize(args[3]) + c.pa.szb = args[4] + c.pa.size = parseSize(args[4]) + case 6: + c.pa.reply = args[3] + c.pa.queues = nil + c.pa.hdb = args[4] + c.pa.hdr = parseSize(args[4]) + c.pa.szb = args[5] + c.pa.size = parseSize(args[5]) + default: + // args[2] is our reply indicator. Should be + or | normally. + if len(args[3]) != 1 { + return fmt.Errorf("processRoutedOriginClusterMsgArgs Bad or Missing Reply Indicator: '%s'", args[3]) + } + switch args[3][0] { + case '+': + c.pa.reply = args[4] + case '|': + c.pa.reply = nil + default: + return fmt.Errorf("processRoutedOriginClusterMsgArgs Bad or Missing Reply Indicator: '%s'", args[3]) + } + + // Grab header size. + c.pa.hdb = args[len(args)-2] + c.pa.hdr = parseSize(c.pa.hdb) + + // Grab size. + c.pa.szb = args[len(args)-1] + c.pa.size = parseSize(c.pa.szb) + + // Grab queue names. + if c.pa.reply != nil { + c.pa.queues = args[5 : len(args)-2] + } else { + c.pa.queues = args[4 : len(args)-2] + } + } + if c.pa.hdr < 0 { + return fmt.Errorf("processRoutedOriginClusterMsgArgs Bad or Missing Header Size: '%s'", arg) + } + if c.pa.size < 0 { + return fmt.Errorf("processRoutedOriginClusterMsgArgs Bad or Missing Size: '%s'", args) + } + if c.pa.hdr > c.pa.size { + return fmt.Errorf("processRoutedOriginClusterMsgArgs Header Size larger then TotalSize: '%s'", arg) + } + + // Common ones processed after check for arg length + c.pa.origin = args[0] + c.pa.account = args[1] + c.pa.subject = args[2] + c.pa.pacache = arg[len(args[0])+1 : len(args[0])+len(args[1])+len(args[2])+2] + + return nil +} + // Process an inbound HMSG specification from the remote route. func (c *client) processRoutedHeaderMsgArgs(arg []byte) error { // Unroll splitArgs to avoid runtime/heap issues @@ -245,7 +339,7 @@ func (c *client) processRoutedHeaderMsgArgs(arg []byte) error { return nil } -// Process an inbound RMSG specification from the remote route. +// Process an inbound RMSG or LMSG specification from the remote route. func (c *client) processRoutedMsgArgs(arg []byte) error { // Unroll splitArgs to avoid runtime/heap issues a := [MAX_RMSG_ARGS][]byte{} @@ -371,6 +465,7 @@ func (c *client) sendRouteConnect(clusterName string, tlsRequired bool) { Headers: s.supportsHeaders(), Cluster: clusterName, Dynamic: s.isClusterNameDynamic(), + LNOC: true, } b, err := json.Marshal(cinfo) @@ -497,6 +592,8 @@ func (c *client) processRouteInfo(info *Info) { c.route.tlsRequired = info.TLSRequired c.route.gatewayURL = info.GatewayURL c.route.remoteName = info.Name + c.route.lnoc = info.LNOC + // When sent through route INFO, if the field is set, it should be of size 1. if len(info.LeafNodeURLs) == 1 { c.route.leafnodeURL = info.LeafNodeURLs[0] @@ -859,7 +956,7 @@ func (c *client) processRemoteUnsub(arg []byte) (err error) { return nil } -func (c *client) processRemoteSub(argo []byte) (err error) { +func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { // Indicate activity. c.in.subs++ @@ -875,21 +972,27 @@ func (c *client) processRemoteSub(argo []byte) (err error) { args := splitArg(arg) sub := &subscription{client: c} + var off int + if hasOrigin { + off = 1 + sub.origin = args[0] + } + switch len(args) { - case 2: + case 2 + off: sub.queue = nil - case 4: - sub.queue = args[2] - sub.qw = int32(parseSize(args[3])) + case 4 + off: + sub.queue = args[2+off] + sub.qw = int32(parseSize(args[3+off])) default: return fmt.Errorf("processRemoteSub Parse Error: '%s'", arg) } - sub.subject = args[1] + sub.subject = args[1+off] // Lookup the account // FIXME(dlc) - This may start having lots of contention? - accountName := string(args[0]) - acc, _ := c.srv.LookupAccount(accountName) + accountName := string(args[0+off]) + acc, _ := srv.LookupAccount(accountName) if acc == nil { if !srv.NewAccountsAllowed() { c.Debugf("Unknown account %q for subject %q", accountName, sub.subject) @@ -921,11 +1024,12 @@ func (c *client) processRemoteSub(argo []byte) (err error) { // We store local subs by account and subject and optionally queue name. // If we have a queue it will have a trailing weight which we do not want. if sub.queue != nil { - sub.sid = arg[:len(arg)-len(args[3])-1] + sub.sid = arg[:len(arg)-len(args[3+off])-1] } else { sub.sid = arg } key := string(sub.sid) + osub := c.subs[key] updateGWs := false if osub == nil { @@ -1055,6 +1159,7 @@ func (c *client) sendRouteUnSubProtos(subs []*subscription, trace bool, filter f } // Low-level function that sends RS+ or RS- protocols for the given subscriptions. +// This can now also send LS+ and LS- for origin cluster based leafnode subscriptions for cluster no-echo. // Use sendRouteSubProtos or sendRouteUnSubProtos instead for clarity. // Lock is held on entry. func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, trace bool, filter func(sub *subscription) bool) { @@ -1086,10 +1191,23 @@ func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, tra } as := len(buf) - if isSubProto { - buf = append(buf, rSubBytes...) + + // If we have an origin cluster and the other side supports leafnode origin clusters + // send an LS+/LS- version instead. + if len(sub.origin) > 0 && c.route.lnoc { + if isSubProto { + buf = append(buf, lSubBytes...) + } else { + buf = append(buf, lUnsubBytes...) + } + buf = append(buf, sub.origin...) + buf = append(buf, ' ') } else { - buf = append(buf, rUnsubBytes...) + if isSubProto { + buf = append(buf, rSubBytes...) + } else { + buf = append(buf, rUnsubBytes...) + } } buf = append(buf, accName...) buf = append(buf, ' ') @@ -1114,6 +1232,7 @@ func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, tra } buf = append(buf, CR_LF...) } + c.queueOutbound(buf) c.flushSignal() } @@ -1381,7 +1500,7 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del // Not required for code correctness, but helps reduce the number of // updates sent to the routes when processing high number of concurrent // queue subscriptions updates (sub/unsub). - // See https://github.com/nats-io/nats-server/pull/1126 ffor more details. + // See https://github.com/nats-io/nats-server/pull/1126 for more details. if isq { acc.sqmu.Lock() } @@ -1551,6 +1670,7 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { GatewayURL: s.getGatewayURL(), Headers: s.supportsHeaders(), Cluster: s.info.Cluster, + LNOC: true, } // Set this if only if advertise is not disabled if !opts.Cluster.NoAdvertise { @@ -1807,6 +1927,7 @@ func (c *client) processRouteConnect(srv *Server, arg []byte, lang string) error // Grab connection name of remote route. c.mu.Lock() c.route.remoteID = c.opts.Name + c.route.lnoc = proto.LNOC c.setRoutePermissions(perms) c.headers = supportsHeaders && proto.Headers c.mu.Unlock() diff --git a/server/server.go b/server/server.go index e22514e0..c5f7de10 100644 --- a/server/server.go +++ b/server/server.go @@ -83,6 +83,7 @@ type Info struct { // Route Specific Import *SubjectPermission `json:"import,omitempty"` Export *SubjectPermission `json:"export,omitempty"` + LNOC bool `json:"lnoc,omitempty"` // Gateways Specific Gateway string `json:"gateway,omitempty"` // Name of the origin Gateway (sent by gateway's INFO) @@ -434,11 +435,13 @@ func (s *Server) setClusterName(name string) { } s.info.Cluster = name s.routeInfo.Cluster = name + s.updatedSolicitedLeafnodes() s.mu.Unlock() if resetCh != nil { resetCh <- struct{}{} } s.Noticef("Cluster name updated to %s", name) + } // Return whether the cluster name is dynamic. diff --git a/test/leafnode_test.go b/test/leafnode_test.go index 59e44dd7..363bc019 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -3145,7 +3145,6 @@ func TestLeafNodeCycleWithSolicited(t *testing.T) { atomic.AddInt32(&requestsReceived, 1) m.Respond([]byte("22")) }) - nc.Flush() nc = clientForCluster(t, cb) defer nc.Close() @@ -3153,12 +3152,35 @@ func TestLeafNodeCycleWithSolicited(t *testing.T) { atomic.AddInt32(&requestsReceived, 1) m.Respond([]byte("33")) }) - nc.Flush() // Soliciting cluster, both solicited connected to the "A" cluster sc := runSolicitLeafCluster(t, "SC", ca, ca) defer shutdownCluster(sc) + checkInterest := func(s *server.Server, subject string) bool { + t.Helper() + acc, _ := s.LookupAccount("$G") + return acc.SubscriptionInterest(subject) + } + + waitForInterest := func(subject string, servers ...*server.Server) { + t.Helper() + checkFor(t, time.Second, 10*time.Millisecond, func() error { + for _, s := range servers { + if !checkInterest(s, subject) { + return fmt.Errorf("No interest") + } + } + return nil + }) + } + + waitForInterest("request", + sc.servers[0], sc.servers[1], + ca.servers[0], ca.servers[1], ca.servers[2], + cb.servers[0], cb.servers[1], cb.servers[2], + ) + // Connect a client to a random server in sc createClientAndRequest := func(c *cluster) (*nats.Conn, *nats.Subscription) { nc := clientForCluster(t, c) @@ -3762,3 +3784,237 @@ func TestLeafNodeQueueSubscriberUnsubscribe(t *testing.T) { // Make sure we receive nothing... expectNothing(t, lc) } + +func TestLeafNodeOriginClusterSingleHub(t *testing.T) { + s, opts := runLeafServer() + defer s.Shutdown() + + c1 := ` + listen: 127.0.0.1:-1 + cluster { name: ln22, listen: 127.0.0.1:-1 } + leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] } + ` + lconf1 := createConfFile(t, []byte(fmt.Sprintf(c1, opts.LeafNode.Port))) + defer os.Remove(lconf1) + + ln1, lopts1 := RunServerWithConfig(lconf1) + defer ln1.Shutdown() + + c2 := ` + listen: 127.0.0.1:-1 + cluster { name: ln22, listen: 127.0.0.1:-1, routes = [ nats-route://127.0.0.1:%d] } + leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] } + ` + lconf2 := createConfFile(t, []byte(fmt.Sprintf(c2, lopts1.Cluster.Port, opts.LeafNode.Port))) + defer os.Remove(lconf2) + + ln2, _ := RunServerWithConfig(lconf2) + defer ln2.Shutdown() + + ln3, _ := RunServerWithConfig(lconf2) + defer ln3.Shutdown() + + checkClusterFormed(t, ln1, ln2, ln3) + checkLeafNodeConnections(t, s, 3) + + // So now we are setup with 3 solicited leafnodes all connected to a hub. + // We will create two clients, one on each leafnode server. + nc1, err := nats.Connect(ln1.ClientURL()) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc1.Close() + + nc2, err := nats.Connect(ln2.ClientURL()) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + + checkInterest := func(s *server.Server, subject string) bool { + t.Helper() + acc, _ := s.LookupAccount("$G") + return acc.SubscriptionInterest(subject) + } + + waitForInterest := func(subject string, servers ...*server.Server) { + t.Helper() + checkFor(t, time.Second, 10*time.Millisecond, func() error { + for _, s := range servers { + if !checkInterest(s, subject) { + return fmt.Errorf("No interest") + } + } + return nil + }) + } + + subj := "foo.bar" + + sub, _ := nc2.SubscribeSync(subj) + waitForInterest(subj, ln1, ln2, ln3, s) + + // Make sure we truncated the subscription bouncing through the hub and back to other leafnodes. + for _, s := range []*server.Server{ln1, ln3} { + acc, _ := s.LookupAccount("$G") + if nms := acc.Interest(subj); nms != 1 { + t.Fatalf("Expected only one active subscription, got %d", nms) + } + } + + // Send a message. + nc1.Publish(subj, nil) + nc1.Flush() + // Wait to propagate + time.Sleep(25 * time.Millisecond) + + // Make sure we only get it once. + if n, _, _ := sub.Pending(); n != 1 { + t.Fatalf("Expected only one message, got %d", n) + } +} + +func TestLeafNodeOriginCluster(t *testing.T) { + ca := createClusterWithName(t, "A", 3) + defer shutdownCluster(ca) + + c1 := ` + server_name: L1 + listen: 127.0.0.1:-1 + cluster { name: ln22, listen: 127.0.0.1:-1 } + leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] } + ` + lconf1 := createConfFile(t, []byte(fmt.Sprintf(c1, ca.opts[0].LeafNode.Port))) + defer os.Remove(lconf1) + + ln1, lopts1 := RunServerWithConfig(lconf1) + defer ln1.Shutdown() + + c2 := ` + server_name: L2 + listen: 127.0.0.1:-1 + cluster { name: ln22, listen: 127.0.0.1:-1, routes = [ nats-route://127.0.0.1:%d] } + leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] } + ` + lconf2 := createConfFile(t, []byte(fmt.Sprintf(c2, lopts1.Cluster.Port, ca.opts[1].LeafNode.Port))) + defer os.Remove(lconf2) + + ln2, _ := RunServerWithConfig(lconf2) + defer ln2.Shutdown() + + c3 := ` + server_name: L3 + listen: 127.0.0.1:-1 + cluster { name: ln22, listen: 127.0.0.1:-1, routes = [ nats-route://127.0.0.1:%d] } + leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] } + ` + lconf3 := createConfFile(t, []byte(fmt.Sprintf(c3, lopts1.Cluster.Port, ca.opts[2].LeafNode.Port))) + defer os.Remove(lconf3) + + ln3, _ := RunServerWithConfig(lconf3) + defer ln3.Shutdown() + + checkClusterFormed(t, ln1, ln2, ln3) + checkLeafNodeConnections(t, ca.servers[0], 1) + checkLeafNodeConnections(t, ca.servers[1], 1) + checkLeafNodeConnections(t, ca.servers[2], 1) + + // So now we are setup with 3 solicited leafnodes connected to different servers in the hub cluster. + // We will create two clients, one on each leafnode server. + nc1, err := nats.Connect(ln1.ClientURL()) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc1.Close() + + nc2, err := nats.Connect(ln2.ClientURL()) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + + checkInterest := func(s *server.Server, subject string) bool { + t.Helper() + acc, _ := s.LookupAccount("$G") + return acc.SubscriptionInterest(subject) + } + + waitForInterest := func(subject string, servers ...*server.Server) { + t.Helper() + checkFor(t, time.Second, 10*time.Millisecond, func() error { + for _, s := range servers { + if !checkInterest(s, subject) { + return fmt.Errorf("No interest") + } + } + return nil + }) + } + + subj := "foo.bar" + + sub, _ := nc2.SubscribeSync(subj) + waitForInterest(subj, ln1, ln2, ln3, ca.servers[0], ca.servers[1], ca.servers[2]) + + // Make sure we truncated the subscription bouncing through the hub and back to other leafnodes. + for _, s := range []*server.Server{ln1, ln3} { + acc, _ := s.LookupAccount("$G") + if nms := acc.Interest(subj); nms != 1 { + t.Fatalf("Expected only one active subscription, got %d", nms) + } + } + + // Send a message. + nc1.Publish(subj, nil) + nc1.Flush() + // Wait to propagate + time.Sleep(25 * time.Millisecond) + + // Make sure we only get it once. + if n, _, _ := sub.Pending(); n != 1 { + t.Fatalf("Expected only one message, got %d", n) + } + // eat the msg + sub.NextMsg(time.Second) + + // Now create interest on the hub side. This will draw the message from a leafnode + // to the hub. We want to make sure that message does not bounce back to other leafnodes. + nc3, err := nats.Connect(ca.servers[0].ClientURL()) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc3.Close() + + wcSubj := "foo.*" + nc3.SubscribeSync(wcSubj) + // This is a placeholder that we can use to check all interest has propagated. + nc3.SubscribeSync("bar") + waitForInterest("bar", ln1, ln2, ln3, ca.servers[0], ca.servers[1], ca.servers[2]) + + // Send another message. + m := nats.NewMsg(subj) + m.Header.Add("Accept-Encoding", "json") + m.Header.Add("Authorization", "s3cr3t") + m.Data = []byte("Hello Headers!") + + nc1.PublishMsg(m) + nc1.Flush() + // Wait to propagate + time.Sleep(25 * time.Millisecond) + + // Make sure we only get it once. + if n, _, _ := sub.Pending(); n != 1 { + t.Fatalf("Expected only one message, got %d", n) + } + // grab the msg + msg, _ := sub.NextMsg(time.Second) + if !bytes.Equal(m.Data, msg.Data) { + t.Fatalf("Expected the payloads to match, wanted %q, got %q", m.Data, msg.Data) + } + if len(msg.Header) != 2 { + t.Fatalf("Expected 2 header entries, got %d", len(msg.Header)) + } + if msg.Header.Get("Authorization") != "s3cr3t" { + t.Fatalf("Expected auth header to match, wanted %q, got %q", "s3cr3t", msg.Header.Get("Authorization")) + } +} diff --git a/test/new_routes_test.go b/test/new_routes_test.go index b50ad7ea..fbd1002b 100644 --- a/test/new_routes_test.go +++ b/test/new_routes_test.go @@ -17,6 +17,7 @@ import ( "encoding/json" "fmt" "net" + "os" "testing" "time" @@ -54,6 +55,10 @@ func TestNewRouteInfoOnConnect(t *testing.T) { if !info.Headers { t.Fatalf("Expected to have headers on by default") } + // Leafnode origin cluster support. + if !info.LNOC { + t.Fatalf("Expected to have leafnode origin cluster support") + } } func TestNewRouteHeaderSupport(t *testing.T) { @@ -825,7 +830,7 @@ func TestNewRouteProcessRoutedMsgs(t *testing.T) { matches := expectMsgs(1) checkMsg(t, matches[0], "foo", "1", "", "2", "ok") - // Now send in a RMSG to the route witha reply and make sure its delivered to the client. + // Now send in a RMSG to the route with a reply and make sure its delivered to the client. routeSend("RMSG $G foo reply 2\r\nok\r\nPING\r\n") routeExpect(pongRe) @@ -1715,3 +1720,116 @@ func TestNewRouteLargeDistinctQueueSubscribers(t *testing.T) { return nil }) } + +func TestNewRouteLeafNodeOriginSupport(t *testing.T) { + content := ` + listen: 127.0.0.1:-1 + cluster { name: xyz, listen: 127.0.0.1:-1 } + leafnodes { listen: 127.0.0.1:-1 } + no_sys_acc: true + ` + conf := createConfFile(t, []byte(content)) + defer os.Remove(conf) + + s, opts := RunServerWithConfig(conf) + defer s.Shutdown() + + gacc, _ := s.LookupAccount("$G") + + lcontent := ` + listen: 127.0.0.1:-1 + cluster { name: ln1, listen: 127.0.0.1:-1 } + leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] } + no_sys_acc: true + ` + lconf := createConfFile(t, []byte(fmt.Sprintf(lcontent, opts.LeafNode.Port))) + defer os.Remove(lconf) + + ln, _ := RunServerWithConfig(lconf) + defer ln.Shutdown() + + checkLeafNodeConnected(t, s) + + lgacc, _ := ln.LookupAccount("$G") + + rc := createRouteConn(t, opts.Cluster.Host, opts.Cluster.Port) + defer rc.Close() + + routeID := "LNOC:22" + routeSend, routeExpect := setupRouteEx(t, rc, opts, routeID) + + pingPong := func() { + t.Helper() + routeSend("PING\r\n") + routeExpect(pongRe) + } + + info := checkInfoMsg(t, rc) + info.ID = routeID + info.LNOC = true + b, err := json.Marshal(info) + if err != nil { + t.Fatalf("Could not marshal test route info: %v", err) + } + + routeSend(fmt.Sprintf("INFO %s\r\n", b)) + routeExpect(rsubRe) + pingPong() + + // Make sure it can process and LS+ + routeSend("LS+ ln1 $G foo\r\n") + pingPong() + + if !gacc.SubscriptionInterest("foo") { + t.Fatalf("Expected interest on \"foo\"") + } + + // This should not have been sent to the leafnode since same origin cluster. + time.Sleep(10 * time.Millisecond) + if lgacc.SubscriptionInterest("foo") { + t.Fatalf("Did not expect interest on \"foo\"") + } + + // Create a connection on the leafnode server. + nc, err := nats.Connect(ln.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error connecting %v", err) + } + defer nc.Close() + + sub, _ := nc.SubscribeSync("bar") + // Let it propagate to the main server + checkFor(t, time.Second, 10*time.Millisecond, func() error { + if !gacc.SubscriptionInterest("bar") { + return fmt.Errorf("No interest") + } + return nil + }) + // For "bar" + routeExpect(rlsubRe) + + // Now pretend like we send a message to the main server over the + // route but from the same origin cluster, should not be delivered + // to the leafnode. + + // Make sure it can process and LMSG. + // LMSG for routes is like HMSG with an origin cluster before the account. + routeSend("LMSG ln1 $G bar 0 2\r\nok\r\n") + pingPong() + + // Let it propagate if not properly truncated. + time.Sleep(10 * time.Millisecond) + if n, _, _ := sub.Pending(); n != 0 { + t.Fatalf("Should not have received the message on bar") + } + + // Try one with all the bells and whistles. + routeSend("LMSG ln1 $G foo + reply bar baz 0 2\r\nok\r\n") + pingPong() + + // Let it propagate if not properly truncated. + time.Sleep(10 * time.Millisecond) + if n, _, _ := sub.Pending(); n != 0 { + t.Fatalf("Should not have received the message on bar") + } +} diff --git a/test/test.go b/test/test.go index 81aedf6d..ca3c8b4f 100644 --- a/test/test.go +++ b/test/test.go @@ -315,6 +315,7 @@ var ( lsubRe = regexp.MustCompile(`LS\+\s+([^\s]+)\s*([^\s]+)?\s*(\d+)?\r\n`) lunsubRe = regexp.MustCompile(`LS\-\s+([^\s]+)\s*([^\s]+)?\r\n`) lmsgRe = regexp.MustCompile(`(?:(?:LMSG\s+([^\s]+)\s+(?:([|+]\s+([\w\s]+)|[^\s]+)[^\S\r\n]+)?(\d+)\s*\r\n([^\\r\\n]*?)\r\n)+?)`) + rlsubRe = regexp.MustCompile(`LS\+\s+([^\s]+)\s+([^\s]+)\s+([^\s]+)\s*([^\s]+)?\s*(\d+)?\r\n`) ) const (