From 8a8695d07cb7d1fc3b4abfec7912966f205c0ad3 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 8 Nov 2019 16:22:34 -0700 Subject: [PATCH] Backward compatibility with previous servers Want to keep this commit separate so that we can easily remove when we no longer want to support both prefixes. - If this server receives a "$GR." message, it takes the subject and tries to process this locally. If there is no cluster race reply may be received ok (like before). - If this server sends a routed reply, it detects if sending to an older server (then uses $GR.) or not (then uses $GNR) - Gateway INFO has a new field that indicates if the server is using the new prefix. Signed-off-by: Ivan Kozlovic --- server/events.go | 4 +- server/gateway.go | 112 +++++++++++++++++++++++++++++++++++++++------- server/server.go | 1 + 3 files changed, 98 insertions(+), 19 deletions(-) diff --git a/server/events.go b/server/events.go index d8b0899d..cb90c90f 100644 --- a/server/events.go +++ b/server/events.go @@ -1047,8 +1047,8 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, subject, _ st } // Now get the request id / reply. We need to see if we have a GW prefix and if so strip that off. reply := rl.ReqId - if isGWRoutedReply([]byte(reply)) { - reply = string(getSubjectFromGWRoutedReply([]byte(reply))) + if gwPrefix, old := isGWRoutedSubjectAndIsOldPrefix([]byte(reply)); gwPrefix { + reply = string(getSubjectFromGWRoutedReply([]byte(reply), old)) } acc.mu.RLock() si := acc.imports.services[reply] diff --git a/server/gateway.go b/server/gateway.go index 57658f05..38be08e6 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -36,8 +36,9 @@ const ( defaultGatewayRecentSubExpiration = 250 * time.Millisecond defaultGatewayMaxRUnsubBeforeSwitch = 1000 - oldGWReplyPrefix = "$GR." - oldGWReplyStart = len(oldGWReplyPrefix) + 5 // len of prefix above + len of hash (4) + "." + oldGWReplyPrefix = "$GR." + oldGWReplyPrefixLen = len(oldGWReplyPrefix) + oldGWReplyStart = oldGWReplyPrefixLen + 5 // len of prefix above + len of hash (4) + "." // The new prefix is "$GNR...." where is 1 character // reserved for service imports, is 8 characters hash of origin @@ -129,6 +130,10 @@ type srvGateway struct { runknown bool // Rejects unknown (not configured) gateway connections replyPfx []byte // Will be "$GNR.<1:reserved>.<8:cluster hash>.<8:server hash>." + // For backward compatibility + oldReplyPfx []byte + oldHash []byte + // We maintain the interest of subjects and queues per account. // For a given account, entries in the map could be something like this: // foo.bar {n: 3} // 3 subs on foo.bar @@ -164,6 +169,7 @@ type gatewayCfg struct { sync.RWMutex *RemoteGatewayOpts hash []byte + oldHash []byte urls map[string]*url.URL connAttempts int tlsName string @@ -183,6 +189,8 @@ type gateway struct { // Set/check in readLoop without lock. This is to know that an inbound has sent the CONNECT protocol first connected bool + // Set to true if outbound is to a server that only knows about $GR, not $GNR + useOldPrefix bool } // Outbound subject interest entry. @@ -273,6 +281,13 @@ func getHash(name string) []byte { return b[:gwHashLen] } +func getOldHash(name string) []byte { + sha := sha256.New() + sha.Write([]byte(name)) + fullHash := []byte(fmt.Sprintf("%x", sha.Sum(nil))) + return fullHash[:4] +} + // Initialize the s.gateway structure. We do this even if the server // does not have a gateway configured. In some part of the code, the // server will check the number of outbound gateways, etc.. and so @@ -287,6 +302,7 @@ func (s *Server) newGateway(opts *Options) error { URLs: make(map[string]struct{}), resolver: opts.Gateway.resolver, runknown: opts.Gateway.RejectUnknown, + oldHash: getOldHash(opts.Gateway.Name), } gateway.Lock() defer gateway.Unlock() @@ -302,6 +318,12 @@ func (s *Server) newGateway(opts *Options) error { prefix = append(prefix, '.') gateway.replyPfx = prefix + prefix = make([]byte, 0, oldGWReplyStart) + prefix = append(prefix, oldGWReplyPrefix...) + prefix = append(prefix, gateway.oldHash...) + prefix = append(prefix, '.') + gateway.oldReplyPfx = prefix + gateway.pasi.m = make(map[string]map[string]*sitally) if gateway.resolver == nil { @@ -317,6 +339,7 @@ func (s *Server) newGateway(opts *Options) error { cfg := &gatewayCfg{ RemoteGatewayOpts: rgo.clone(), hash: getHash(rgo.Name), + oldHash: getOldHash(rgo.Name), urls: make(map[string]*url.URL, len(rgo.URLs)), } if opts.Gateway.TLSConfig != nil && cfg.TLSConfig == nil { @@ -442,6 +465,7 @@ func (s *Server) gatewayAcceptLoop(ch chan struct{}) { TLSVerify: tlsReq, MaxPayload: s.info.MaxPayload, Gateway: opts.Gateway.Name, + GatewayNRP: true, } // If we have selected a random port... if port == 0 { @@ -978,6 +1002,7 @@ func (c *client) processGatewayInfo(info *Info) { // Send INFO too c.sendInfo(c.gw.infoJSON) c.gw.infoJSON = nil + c.gw.useOldPrefix = !info.GatewayNRP c.mu.Unlock() // Register as an outbound gateway.. if we had a protocol to ack our connect, @@ -1300,6 +1325,7 @@ func (s *Server) processImplicitGateway(info *Info) { cfg = &gatewayCfg{ RemoteGatewayOpts: &RemoteGatewayOpts{Name: gwName}, hash: getHash(gwName), + oldHash: getOldHash(gwName), urls: make(map[string]*url.URL, len(info.GatewayURLs)), implicit: true, } @@ -2251,6 +2277,18 @@ func isGWRoutedReply(subj []byte) bool { return len(subj) > gwSubjectOffset && string(subj[:gwReplyPrefixLen]) == gwReplyPrefix } +// Same than isGWRoutedReply but accepts the old prefix $GR and returns +// a boolean indicating if this is the old prefix +func isGWRoutedSubjectAndIsOldPrefix(subj []byte) (bool, bool) { + if isGWRoutedReply(subj) { + return true, false + } + if len(subj) > oldGWReplyStart && string(subj[:oldGWReplyPrefixLen]) == oldGWReplyPrefix { + return true, true + } + return false, false +} + // Returns true if subject starts with "$GNR.". This is to check that // clients can't publish on this subject. func hasGWRoutedReplyPrefix(subj []byte) bool { @@ -2303,6 +2341,7 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr gws = append(gws, gw.outo[i]) } thisClusterReplyPrefix := gw.replyPfx + thisClusterOldReplyPrefix := gw.oldReplyPfx gw.RUnlock() if len(gws) == 0 { return @@ -2326,14 +2365,26 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr // Check if the subject is on the reply prefix, if so, we // need to send that message directly to the origin cluster. - if isGWRoutedReply(subject) { - dstHash = subject[gwClusterOffset : gwClusterOffset+gwHashLen] + directSend, old := isGWRoutedSubjectAndIsOldPrefix(subject) + if directSend { + if old { + dstHash = subject[oldGWReplyPrefixLen : oldGWReplyStart-1] + } else { + dstHash = subject[gwClusterOffset : gwClusterOffset+gwHashLen] + } } for i := 0; i < len(gws); i++ { gwc := gws[i] - if dstHash != nil { + if directSend { gwc.mu.Lock() - ok := gwc.gw.cfg != nil && bytes.Equal(dstHash, gwc.gw.cfg.hash) + var ok bool + if gwc.gw.cfg != nil { + if old { + ok = bytes.Equal(dstHash, gwc.gw.cfg.oldHash) + } else { + ok = bytes.Equal(dstHash, gwc.gw.cfg.hash) + } + } gwc.mu.Unlock() if !ok { continue @@ -2377,7 +2428,14 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr // Decide if we should map. if gw.shouldMapReplyForGatewaySend(c, acc, reply) { mreply = mreplya[:0] - mreply = append(mreply, thisClusterReplyPrefix...) + gwc.mu.Lock() + useOldPrefix := gwc.gw.useOldPrefix + gwc.mu.Unlock() + if useOldPrefix { + mreply = append(mreply, thisClusterOldReplyPrefix...) + } else { + mreply = append(mreply, thisClusterReplyPrefix...) + } mreply = append(mreply, reply...) } } @@ -2535,7 +2593,10 @@ func (s *Server) getRouteByHash(srvHash []byte) *client { } // Returns the subject from the routed reply -func getSubjectFromGWRoutedReply(reply []byte) []byte { +func getSubjectFromGWRoutedReply(reply []byte, isOldPrefix bool) []byte { + if isOldPrefix { + return reply[oldGWReplyStart:] + } return reply[gwSubjectOffset:] } @@ -2550,26 +2611,43 @@ func getSubjectFromGWRoutedReply(reply []byte) []byte { func (c *client) handleGatewayReply(msg []byte) (processed bool) { // Do not handle GW prefixed messages if this server does not have // gateway enabled or if the subject does not start with the previx. - if !c.srv.gateway.enabled || !isGWRoutedReply(c.pa.subject) { + if !c.srv.gateway.enabled { + return false + } + isGWPrefix, oldPrefix := isGWRoutedSubjectAndIsOldPrefix(c.pa.subject) + if !isGWPrefix { return false } // Save original subject (in case we have to forward) orgSubject := c.pa.subject - clusterPrefix := c.pa.subject[gwClusterOffset : gwClusterOffset+gwHashLen] - srvHash := c.pa.subject[gwServerOffset : gwServerOffset+gwHashLen] - subject := c.pa.subject[gwSubjectOffset:] + var clusterHash []byte + var srvHash []byte + var subject []byte - // Check if this reply is intended for our cluster. - if !bytes.Equal(clusterPrefix, c.srv.gateway.getClusterHash()) { - // We could report, for now, just drop. - return true + if oldPrefix { + clusterHash = c.pa.subject[oldGWReplyPrefixLen : oldGWReplyStart-1] + // Check if this reply is intended for our cluster. + if !bytes.Equal(clusterHash, c.srv.gateway.oldHash) { + // We could report, for now, just drop. + return true + } + subject = c.pa.subject[oldGWReplyStart:] + } else { + clusterHash = c.pa.subject[gwClusterOffset : gwClusterOffset+gwHashLen] + // Check if this reply is intended for our cluster. + if !bytes.Equal(clusterHash, c.srv.gateway.getClusterHash()) { + // We could report, for now, just drop. + return true + } + srvHash = c.pa.subject[gwServerOffset : gwServerOffset+gwHashLen] + subject = c.pa.subject[gwSubjectOffset:] } var route *client // If the origin is not this server, get the route this should be sent to. - if c.kind == GATEWAY && !bytes.Equal(srvHash, c.srv.hash) { + if c.kind == GATEWAY && srvHash != nil && !bytes.Equal(srvHash, c.srv.hash) { route = c.srv.getRouteByHash(srvHash) // This will be possibly nil, and in this case we will try to process // the interest from this server. diff --git a/server/server.go b/server/server.go index 1562a308..3c058001 100644 --- a/server/server.go +++ b/server/server.go @@ -86,6 +86,7 @@ type Info struct { GatewayURL string `json:"gateway_url,omitempty"` // Gateway URL on that server (sent by route's INFO) GatewayCmd byte `json:"gateway_cmd,omitempty"` // Command code for the receiving server to know what to do GatewayCmdPayload []byte `json:"gateway_cmd_payload,omitempty"` // Command payload when needed + GatewayNRP bool `json:"gateway_nrp,omitempty"` // Uses new $GNR. prefix for mapped replies // LeafNode Specific LeafNodeURLs []string `json:"leafnode_urls,omitempty"` // LeafNode URLs that the server can reconnect to.