From ef38abe75b8a35daa0ecea8ea5d2f029a593921d Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 15 Jan 2021 17:32:04 -0700 Subject: [PATCH] Fixed gateway reply mapping following changes in JetStream clustering Those changes are required to maintain backward compatibility. Since the replies are "_G_.." and the hash were 6 characters long, changing to 8 the hash function would break things. Signed-off-by: Ivan Kozlovic --- server/events.go | 5 ++++ server/gateway.go | 63 +++++++++++++++++++++++++++++++++++++---------- server/route.go | 18 ++++++++++---- server/server.go | 1 - 4 files changed, 68 insertions(+), 19 deletions(-) diff --git a/server/events.go b/server/events.go index afef07ff..c358802e 100644 --- a/server/events.go +++ b/server/events.go @@ -562,6 +562,11 @@ func (s *Server) startRemoteServerSweepTimer() { // Length of our system hash used for server targeted messages. const sysHashLen = 8 +// Computes a hash of 8 characters for the name. +func getHash(name string) []byte { + return getHashSize(name, sysHashLen) +} + // This will setup our system wide tracking subs. // For now we will setup one wildcard subscription to // monitor all accounts for changes in number of connections. diff --git a/server/gateway.go b/server/gateway.go index 505d9316..c6f337f7 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -45,7 +45,7 @@ const ( // hash of origin cluster name and is 6 characters hash of origin server pub key. gwReplyPrefix = "_GR_." gwReplyPrefixLen = len(gwReplyPrefix) - gwHashLen = sysHashLen + gwHashLen = 6 gwClusterOffset = gwReplyPrefixLen gwServerOffset = gwClusterOffset + gwHashLen + 1 gwSubjectOffset = gwServerOffset + gwHashLen + 1 @@ -161,6 +161,10 @@ type srvGateway struct { resolver netResolver // Used to resolve host name before calling net.Dial() sqbsz int // Max buffer size to send queue subs protocol. Used for testing. recSubExp time.Duration // For how long do we check if there is a subscription match for a message with reply + + // These are used for routing of mapped replies. + sIDHash []byte // Server ID hash (6 bytes) + routesIDByHash sync.Map // Route's server ID is hashed (6 bytes) and stored in this map. } // Subject interest tally. Also indicates if the key in the map is a @@ -273,16 +277,21 @@ func validateGatewayOptions(o *Options) error { return nil } -// Computes a hash of 8 characters for the name. -// This will be used for routing of replies. -func getHash(name string) []byte { +// Computes a hash for the given `name`. The result will be `size` characters long. +func getHashSize(name string, size int) []byte { sha := sha256.New() sha.Write([]byte(name)) b := sha.Sum(nil) - for i := 0; i < gwHashLen; i++ { + for i := 0; i < size; i++ { b[i] = digits[int(b[i]%base)] } - return b[:gwHashLen] + return b[:size] +} + +// Computes a hash of 6 characters for the name. +// This will be used for routing of replies. +func getGWHash(name string) []byte { + return getHashSize(name, gwHashLen) } func getOldHash(name string) []byte { @@ -311,13 +320,13 @@ func (s *Server) newGateway(opts *Options) error { gateway.Lock() defer gateway.Unlock() - s.hash = getHash(s.info.ID) - clusterHash := getHash(opts.Gateway.Name) + gateway.sIDHash = getGWHash(s.info.ID) + clusterHash := getGWHash(opts.Gateway.Name) prefix := make([]byte, 0, gwSubjectOffset) prefix = append(prefix, gwReplyPrefix...) prefix = append(prefix, clusterHash...) prefix = append(prefix, '.') - prefix = append(prefix, s.hash...) + prefix = append(prefix, gateway.sIDHash...) prefix = append(prefix, '.') gateway.replyPfx = prefix @@ -341,7 +350,7 @@ func (s *Server) newGateway(opts *Options) error { } cfg := &gatewayCfg{ RemoteGatewayOpts: rgo.clone(), - hash: getHash(rgo.Name), + hash: getGWHash(rgo.Name), oldHash: getOldHash(rgo.Name), urls: make(map[string]*url.URL, len(rgo.URLs)), } @@ -1329,7 +1338,7 @@ func (s *Server) processImplicitGateway(info *Info) { opts := s.getOpts() cfg = &gatewayCfg{ RemoteGatewayOpts: &RemoteGatewayOpts{Name: gwName}, - hash: getHash(gwName), + hash: getGWHash(gwName), oldHash: getOldHash(gwName), urls: make(map[string]*url.URL, len(info.GatewayURLs)), implicit: true, @@ -2650,6 +2659,34 @@ func (s *Server) getRouteByHash(srvHash []byte) *client { return route } +// Store this route in map with the key being the remote server's name hash +// and the remote server's ID hash used by gateway replies mapping routing. +func (s *Server) storeRouteByHash(srvNameHash, srvIDHash string, c *client) { + s.routesByHash.Store(srvNameHash, c) + if !s.gateway.enabled { + return + } + s.gateway.routesIDByHash.Store(srvIDHash, c) +} + +// Remove the route with the given keys from the map. +func (s *Server) removeRouteByHash(srvNameHash, srvIDHash string) { + s.routesByHash.Delete(srvNameHash) + if !s.gateway.enabled { + return + } + s.gateway.routesIDByHash.Delete(srvIDHash) +} + +// Returns the route with given hash or nil if not found. +// This is for gateways only. +func (g *srvGateway) getRouteByHash(hash []byte) *client { + if v, ok := g.routesIDByHash.Load(string(hash)); ok { + return v.(*client) + } + return nil +} + // Returns the subject from the routed reply func getSubjectFromGWRoutedReply(reply []byte, isOldPrefix bool) []byte { if isOldPrefix { @@ -2705,8 +2742,8 @@ func (c *client) handleGatewayReply(msg []byte) (processed bool) { var route *client // If the origin is not this server, get the route this should be sent to. - if c.kind == GATEWAY && srvHash != nil && !bytes.Equal(srvHash, c.srv.hash) { - route = c.srv.getRouteByHash(srvHash) + if c.kind == GATEWAY && srvHash != nil && !bytes.Equal(srvHash, c.srv.gateway.sIDHash) { + route = c.srv.gateway.getRouteByHash(srvHash) // This will be possibly nil, and in this case we will try to process // the interest from this server. } diff --git a/server/route.go b/server/route.go index ebc6508f..42873021 100644 --- a/server/route.go +++ b/server/route.go @@ -82,6 +82,7 @@ type route struct { gatewayURL string leafnodeURL string hash string + idHash string } type connectInfo struct { @@ -611,8 +612,12 @@ func (c *client) processRouteInfo(info *Info) { if len(info.LeafNodeURLs) == 1 { c.route.leafnodeURL = info.LeafNodeURLs[0] } - // Compute the hash of this route based on remoteID + // Compute the hash of this route based on remote server name c.route.hash = string(getHash(info.Name)) + // Same with remote server ID (used for GW mapped replies routing). + // Use getGWHash since we don't use the same hash len for that + // for backward compatibility. + c.route.idHash = string(getGWHash(info.ID)) // Copy over permissions as well. c.opts.Import = info.Import @@ -1457,11 +1462,12 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) { c.route.connectURLs = info.ClientConnectURLs c.route.wsConnURLs = info.WSConnectURLs cid := c.cid - hash := string(c.route.hash) + hash := c.route.hash + idHash := c.route.idHash c.mu.Unlock() // Store this route using the hash as the key - s.routesByHash.Store(hash, c) + s.storeRouteByHash(hash, idHash, c) // Now that we have registered the route, we can remove from the temp map. s.removeFromTempClients(cid) @@ -1499,7 +1505,7 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) { // from our list. c.route.leafnodeURL = _EMPTY_ // Same for the route hash otherwise it would be removed from s.routesByHash. - c.route.hash = _EMPTY_ + c.route.hash, c.route.idHash = _EMPTY_, _EMPTY_ c.mu.Unlock() remote.mu.Lock() @@ -2022,6 +2028,7 @@ func (s *Server) removeRoute(c *client) { var lnURL string var gwURL string var hash string + var idHash string c.mu.Lock() cid := c.cid r := c.route @@ -2029,6 +2036,7 @@ func (s *Server) removeRoute(c *client) { rID = r.remoteID lnURL = r.leafnodeURL hash = r.hash + idHash = r.idHash gwURL = r.gatewayURL } c.mu.Unlock() @@ -2050,7 +2058,7 @@ func (s *Server) removeRoute(c *client) { if lnURL != _EMPTY_ && s.removeLeafNodeURL(lnURL) { s.sendAsyncLeafNodeInfo() } - s.routesByHash.Delete(hash) + s.removeRouteByHash(hash, idHash) } s.removeFromTempClients(cid) s.mu.Unlock() diff --git a/server/server.go b/server/server.go index 7d833010..240f5c04 100644 --- a/server/server.go +++ b/server/server.go @@ -124,7 +124,6 @@ type Server struct { clients map[uint64]*client routes map[uint64]*client routesByHash sync.Map - hash []byte remotes map[string]*client leafs map[uint64]*client users map[string]*User