Backward compatibility with previous servers

Want to keep this commit separate so that we can easily remove
when we no longer want to support both prefixes.

- If this server receives a "$GR." message, it takes the subject
  and tries to process this locally. If there is no cluster race
  reply may be received ok (like before).
- If this server sends a routed reply, it detects if sending to
  an older server (then uses $GR.) or not (then uses $GNR)
- Gateway INFO has a new field that indicates if the server is
  using the new prefix.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2019-11-08 16:22:34 -07:00
parent 9b7dab0548
commit 8a8695d07c
3 changed files with 98 additions and 19 deletions

View File

@@ -1047,8 +1047,8 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, subject, _ st
}
// Now get the request id / reply. We need to see if we have a GW prefix and if so strip that off.
reply := rl.ReqId
if isGWRoutedReply([]byte(reply)) {
reply = string(getSubjectFromGWRoutedReply([]byte(reply)))
if gwPrefix, old := isGWRoutedSubjectAndIsOldPrefix([]byte(reply)); gwPrefix {
reply = string(getSubjectFromGWRoutedReply([]byte(reply), old))
}
acc.mu.RLock()
si := acc.imports.services[reply]

View File

@@ -36,8 +36,9 @@ const (
defaultGatewayRecentSubExpiration = 250 * time.Millisecond
defaultGatewayMaxRUnsubBeforeSwitch = 1000
oldGWReplyPrefix = "$GR."
oldGWReplyStart = len(oldGWReplyPrefix) + 5 // len of prefix above + len of hash (4) + "."
oldGWReplyPrefix = "$GR."
oldGWReplyPrefixLen = len(oldGWReplyPrefix)
oldGWReplyStart = oldGWReplyPrefixLen + 5 // len of prefix above + len of hash (4) + "."
// The new prefix is "$GNR.<x>.<cluster>.<server>." where <x> is 1 character
// reserved for service imports, <cluster> is 8 characters hash of origin
@@ -129,6 +130,10 @@ type srvGateway struct {
runknown bool // Rejects unknown (not configured) gateway connections
replyPfx []byte // Will be "$GNR.<1:reserved>.<8:cluster hash>.<8:server hash>."
// For backward compatibility
oldReplyPfx []byte
oldHash []byte
// We maintain the interest of subjects and queues per account.
// For a given account, entries in the map could be something like this:
// foo.bar {n: 3} // 3 subs on foo.bar
@@ -164,6 +169,7 @@ type gatewayCfg struct {
sync.RWMutex
*RemoteGatewayOpts
hash []byte
oldHash []byte
urls map[string]*url.URL
connAttempts int
tlsName string
@@ -183,6 +189,8 @@ type gateway struct {
// Set/check in readLoop without lock. This is to know that an inbound has sent the CONNECT protocol first
connected bool
// Set to true if outbound is to a server that only knows about $GR, not $GNR
useOldPrefix bool
}
// Outbound subject interest entry.
@@ -273,6 +281,13 @@ func getHash(name string) []byte {
return b[:gwHashLen]
}
func getOldHash(name string) []byte {
sha := sha256.New()
sha.Write([]byte(name))
fullHash := []byte(fmt.Sprintf("%x", sha.Sum(nil)))
return fullHash[:4]
}
// Initialize the s.gateway structure. We do this even if the server
// does not have a gateway configured. In some part of the code, the
// server will check the number of outbound gateways, etc.. and so
@@ -287,6 +302,7 @@ func (s *Server) newGateway(opts *Options) error {
URLs: make(map[string]struct{}),
resolver: opts.Gateway.resolver,
runknown: opts.Gateway.RejectUnknown,
oldHash: getOldHash(opts.Gateway.Name),
}
gateway.Lock()
defer gateway.Unlock()
@@ -302,6 +318,12 @@ func (s *Server) newGateway(opts *Options) error {
prefix = append(prefix, '.')
gateway.replyPfx = prefix
prefix = make([]byte, 0, oldGWReplyStart)
prefix = append(prefix, oldGWReplyPrefix...)
prefix = append(prefix, gateway.oldHash...)
prefix = append(prefix, '.')
gateway.oldReplyPfx = prefix
gateway.pasi.m = make(map[string]map[string]*sitally)
if gateway.resolver == nil {
@@ -317,6 +339,7 @@ func (s *Server) newGateway(opts *Options) error {
cfg := &gatewayCfg{
RemoteGatewayOpts: rgo.clone(),
hash: getHash(rgo.Name),
oldHash: getOldHash(rgo.Name),
urls: make(map[string]*url.URL, len(rgo.URLs)),
}
if opts.Gateway.TLSConfig != nil && cfg.TLSConfig == nil {
@@ -442,6 +465,7 @@ func (s *Server) gatewayAcceptLoop(ch chan struct{}) {
TLSVerify: tlsReq,
MaxPayload: s.info.MaxPayload,
Gateway: opts.Gateway.Name,
GatewayNRP: true,
}
// If we have selected a random port...
if port == 0 {
@@ -978,6 +1002,7 @@ func (c *client) processGatewayInfo(info *Info) {
// Send INFO too
c.sendInfo(c.gw.infoJSON)
c.gw.infoJSON = nil
c.gw.useOldPrefix = !info.GatewayNRP
c.mu.Unlock()
// Register as an outbound gateway.. if we had a protocol to ack our connect,
@@ -1300,6 +1325,7 @@ func (s *Server) processImplicitGateway(info *Info) {
cfg = &gatewayCfg{
RemoteGatewayOpts: &RemoteGatewayOpts{Name: gwName},
hash: getHash(gwName),
oldHash: getOldHash(gwName),
urls: make(map[string]*url.URL, len(info.GatewayURLs)),
implicit: true,
}
@@ -2251,6 +2277,18 @@ func isGWRoutedReply(subj []byte) bool {
return len(subj) > gwSubjectOffset && string(subj[:gwReplyPrefixLen]) == gwReplyPrefix
}
// Same than isGWRoutedReply but accepts the old prefix $GR and returns
// a boolean indicating if this is the old prefix
func isGWRoutedSubjectAndIsOldPrefix(subj []byte) (bool, bool) {
if isGWRoutedReply(subj) {
return true, false
}
if len(subj) > oldGWReplyStart && string(subj[:oldGWReplyPrefixLen]) == oldGWReplyPrefix {
return true, true
}
return false, false
}
// Returns true if subject starts with "$GNR.". This is to check that
// clients can't publish on this subject.
func hasGWRoutedReplyPrefix(subj []byte) bool {
@@ -2303,6 +2341,7 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
gws = append(gws, gw.outo[i])
}
thisClusterReplyPrefix := gw.replyPfx
thisClusterOldReplyPrefix := gw.oldReplyPfx
gw.RUnlock()
if len(gws) == 0 {
return
@@ -2326,14 +2365,26 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
// Check if the subject is on the reply prefix, if so, we
// need to send that message directly to the origin cluster.
if isGWRoutedReply(subject) {
dstHash = subject[gwClusterOffset : gwClusterOffset+gwHashLen]
directSend, old := isGWRoutedSubjectAndIsOldPrefix(subject)
if directSend {
if old {
dstHash = subject[oldGWReplyPrefixLen : oldGWReplyStart-1]
} else {
dstHash = subject[gwClusterOffset : gwClusterOffset+gwHashLen]
}
}
for i := 0; i < len(gws); i++ {
gwc := gws[i]
if dstHash != nil {
if directSend {
gwc.mu.Lock()
ok := gwc.gw.cfg != nil && bytes.Equal(dstHash, gwc.gw.cfg.hash)
var ok bool
if gwc.gw.cfg != nil {
if old {
ok = bytes.Equal(dstHash, gwc.gw.cfg.oldHash)
} else {
ok = bytes.Equal(dstHash, gwc.gw.cfg.hash)
}
}
gwc.mu.Unlock()
if !ok {
continue
@@ -2377,7 +2428,14 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
// Decide if we should map.
if gw.shouldMapReplyForGatewaySend(c, acc, reply) {
mreply = mreplya[:0]
mreply = append(mreply, thisClusterReplyPrefix...)
gwc.mu.Lock()
useOldPrefix := gwc.gw.useOldPrefix
gwc.mu.Unlock()
if useOldPrefix {
mreply = append(mreply, thisClusterOldReplyPrefix...)
} else {
mreply = append(mreply, thisClusterReplyPrefix...)
}
mreply = append(mreply, reply...)
}
}
@@ -2535,7 +2593,10 @@ func (s *Server) getRouteByHash(srvHash []byte) *client {
}
// Returns the subject from the routed reply
func getSubjectFromGWRoutedReply(reply []byte) []byte {
func getSubjectFromGWRoutedReply(reply []byte, isOldPrefix bool) []byte {
if isOldPrefix {
return reply[oldGWReplyStart:]
}
return reply[gwSubjectOffset:]
}
@@ -2550,26 +2611,43 @@ func getSubjectFromGWRoutedReply(reply []byte) []byte {
func (c *client) handleGatewayReply(msg []byte) (processed bool) {
// Do not handle GW prefixed messages if this server does not have
// gateway enabled or if the subject does not start with the previx.
if !c.srv.gateway.enabled || !isGWRoutedReply(c.pa.subject) {
if !c.srv.gateway.enabled {
return false
}
isGWPrefix, oldPrefix := isGWRoutedSubjectAndIsOldPrefix(c.pa.subject)
if !isGWPrefix {
return false
}
// Save original subject (in case we have to forward)
orgSubject := c.pa.subject
clusterPrefix := c.pa.subject[gwClusterOffset : gwClusterOffset+gwHashLen]
srvHash := c.pa.subject[gwServerOffset : gwServerOffset+gwHashLen]
subject := c.pa.subject[gwSubjectOffset:]
var clusterHash []byte
var srvHash []byte
var subject []byte
// Check if this reply is intended for our cluster.
if !bytes.Equal(clusterPrefix, c.srv.gateway.getClusterHash()) {
// We could report, for now, just drop.
return true
if oldPrefix {
clusterHash = c.pa.subject[oldGWReplyPrefixLen : oldGWReplyStart-1]
// Check if this reply is intended for our cluster.
if !bytes.Equal(clusterHash, c.srv.gateway.oldHash) {
// We could report, for now, just drop.
return true
}
subject = c.pa.subject[oldGWReplyStart:]
} else {
clusterHash = c.pa.subject[gwClusterOffset : gwClusterOffset+gwHashLen]
// Check if this reply is intended for our cluster.
if !bytes.Equal(clusterHash, c.srv.gateway.getClusterHash()) {
// We could report, for now, just drop.
return true
}
srvHash = c.pa.subject[gwServerOffset : gwServerOffset+gwHashLen]
subject = c.pa.subject[gwSubjectOffset:]
}
var route *client
// If the origin is not this server, get the route this should be sent to.
if c.kind == GATEWAY && !bytes.Equal(srvHash, c.srv.hash) {
if c.kind == GATEWAY && srvHash != nil && !bytes.Equal(srvHash, c.srv.hash) {
route = c.srv.getRouteByHash(srvHash)
// This will be possibly nil, and in this case we will try to process
// the interest from this server.

View File

@@ -86,6 +86,7 @@ type Info struct {
GatewayURL string `json:"gateway_url,omitempty"` // Gateway URL on that server (sent by route's INFO)
GatewayCmd byte `json:"gateway_cmd,omitempty"` // Command code for the receiving server to know what to do
GatewayCmdPayload []byte `json:"gateway_cmd_payload,omitempty"` // Command payload when needed
GatewayNRP bool `json:"gateway_nrp,omitempty"` // Uses new $GNR. prefix for mapped replies
// LeafNode Specific
LeafNodeURLs []string `json:"leafnode_urls,omitempty"` // LeafNode URLs that the server can reconnect to.