mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-15 02:30:40 -07:00
Fixed gateway reply mapping following changes in JetStream clustering
Those changes are required to maintain backward compatibility. Since the replies are "_G_.<gateway name hash>.<server ID hash>" and the hash were 6 characters long, changing to 8 the hash function would break things. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -562,6 +562,11 @@ func (s *Server) startRemoteServerSweepTimer() {
|
||||
// Length of our system hash used for server targeted messages.
|
||||
const sysHashLen = 8
|
||||
|
||||
// Computes a hash of 8 characters for the name.
|
||||
func getHash(name string) []byte {
|
||||
return getHashSize(name, sysHashLen)
|
||||
}
|
||||
|
||||
// This will setup our system wide tracking subs.
|
||||
// For now we will setup one wildcard subscription to
|
||||
// monitor all accounts for changes in number of connections.
|
||||
|
||||
@@ -45,7 +45,7 @@ const (
|
||||
// hash of origin cluster name and <server> is 6 characters hash of origin server pub key.
|
||||
gwReplyPrefix = "_GR_."
|
||||
gwReplyPrefixLen = len(gwReplyPrefix)
|
||||
gwHashLen = sysHashLen
|
||||
gwHashLen = 6
|
||||
gwClusterOffset = gwReplyPrefixLen
|
||||
gwServerOffset = gwClusterOffset + gwHashLen + 1
|
||||
gwSubjectOffset = gwServerOffset + gwHashLen + 1
|
||||
@@ -161,6 +161,10 @@ type srvGateway struct {
|
||||
resolver netResolver // Used to resolve host name before calling net.Dial()
|
||||
sqbsz int // Max buffer size to send queue subs protocol. Used for testing.
|
||||
recSubExp time.Duration // For how long do we check if there is a subscription match for a message with reply
|
||||
|
||||
// These are used for routing of mapped replies.
|
||||
sIDHash []byte // Server ID hash (6 bytes)
|
||||
routesIDByHash sync.Map // Route's server ID is hashed (6 bytes) and stored in this map.
|
||||
}
|
||||
|
||||
// Subject interest tally. Also indicates if the key in the map is a
|
||||
@@ -273,16 +277,21 @@ func validateGatewayOptions(o *Options) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Computes a hash of 8 characters for the name.
|
||||
// This will be used for routing of replies.
|
||||
func getHash(name string) []byte {
|
||||
// Computes a hash for the given `name`. The result will be `size` characters long.
|
||||
func getHashSize(name string, size int) []byte {
|
||||
sha := sha256.New()
|
||||
sha.Write([]byte(name))
|
||||
b := sha.Sum(nil)
|
||||
for i := 0; i < gwHashLen; i++ {
|
||||
for i := 0; i < size; i++ {
|
||||
b[i] = digits[int(b[i]%base)]
|
||||
}
|
||||
return b[:gwHashLen]
|
||||
return b[:size]
|
||||
}
|
||||
|
||||
// Computes a hash of 6 characters for the name.
|
||||
// This will be used for routing of replies.
|
||||
func getGWHash(name string) []byte {
|
||||
return getHashSize(name, gwHashLen)
|
||||
}
|
||||
|
||||
func getOldHash(name string) []byte {
|
||||
@@ -311,13 +320,13 @@ func (s *Server) newGateway(opts *Options) error {
|
||||
gateway.Lock()
|
||||
defer gateway.Unlock()
|
||||
|
||||
s.hash = getHash(s.info.ID)
|
||||
clusterHash := getHash(opts.Gateway.Name)
|
||||
gateway.sIDHash = getGWHash(s.info.ID)
|
||||
clusterHash := getGWHash(opts.Gateway.Name)
|
||||
prefix := make([]byte, 0, gwSubjectOffset)
|
||||
prefix = append(prefix, gwReplyPrefix...)
|
||||
prefix = append(prefix, clusterHash...)
|
||||
prefix = append(prefix, '.')
|
||||
prefix = append(prefix, s.hash...)
|
||||
prefix = append(prefix, gateway.sIDHash...)
|
||||
prefix = append(prefix, '.')
|
||||
gateway.replyPfx = prefix
|
||||
|
||||
@@ -341,7 +350,7 @@ func (s *Server) newGateway(opts *Options) error {
|
||||
}
|
||||
cfg := &gatewayCfg{
|
||||
RemoteGatewayOpts: rgo.clone(),
|
||||
hash: getHash(rgo.Name),
|
||||
hash: getGWHash(rgo.Name),
|
||||
oldHash: getOldHash(rgo.Name),
|
||||
urls: make(map[string]*url.URL, len(rgo.URLs)),
|
||||
}
|
||||
@@ -1329,7 +1338,7 @@ func (s *Server) processImplicitGateway(info *Info) {
|
||||
opts := s.getOpts()
|
||||
cfg = &gatewayCfg{
|
||||
RemoteGatewayOpts: &RemoteGatewayOpts{Name: gwName},
|
||||
hash: getHash(gwName),
|
||||
hash: getGWHash(gwName),
|
||||
oldHash: getOldHash(gwName),
|
||||
urls: make(map[string]*url.URL, len(info.GatewayURLs)),
|
||||
implicit: true,
|
||||
@@ -2650,6 +2659,34 @@ func (s *Server) getRouteByHash(srvHash []byte) *client {
|
||||
return route
|
||||
}
|
||||
|
||||
// Store this route in map with the key being the remote server's name hash
|
||||
// and the remote server's ID hash used by gateway replies mapping routing.
|
||||
func (s *Server) storeRouteByHash(srvNameHash, srvIDHash string, c *client) {
|
||||
s.routesByHash.Store(srvNameHash, c)
|
||||
if !s.gateway.enabled {
|
||||
return
|
||||
}
|
||||
s.gateway.routesIDByHash.Store(srvIDHash, c)
|
||||
}
|
||||
|
||||
// Remove the route with the given keys from the map.
|
||||
func (s *Server) removeRouteByHash(srvNameHash, srvIDHash string) {
|
||||
s.routesByHash.Delete(srvNameHash)
|
||||
if !s.gateway.enabled {
|
||||
return
|
||||
}
|
||||
s.gateway.routesIDByHash.Delete(srvIDHash)
|
||||
}
|
||||
|
||||
// Returns the route with given hash or nil if not found.
|
||||
// This is for gateways only.
|
||||
func (g *srvGateway) getRouteByHash(hash []byte) *client {
|
||||
if v, ok := g.routesIDByHash.Load(string(hash)); ok {
|
||||
return v.(*client)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Returns the subject from the routed reply
|
||||
func getSubjectFromGWRoutedReply(reply []byte, isOldPrefix bool) []byte {
|
||||
if isOldPrefix {
|
||||
@@ -2705,8 +2742,8 @@ func (c *client) handleGatewayReply(msg []byte) (processed bool) {
|
||||
var route *client
|
||||
|
||||
// If the origin is not this server, get the route this should be sent to.
|
||||
if c.kind == GATEWAY && srvHash != nil && !bytes.Equal(srvHash, c.srv.hash) {
|
||||
route = c.srv.getRouteByHash(srvHash)
|
||||
if c.kind == GATEWAY && srvHash != nil && !bytes.Equal(srvHash, c.srv.gateway.sIDHash) {
|
||||
route = c.srv.gateway.getRouteByHash(srvHash)
|
||||
// This will be possibly nil, and in this case we will try to process
|
||||
// the interest from this server.
|
||||
}
|
||||
|
||||
@@ -82,6 +82,7 @@ type route struct {
|
||||
gatewayURL string
|
||||
leafnodeURL string
|
||||
hash string
|
||||
idHash string
|
||||
}
|
||||
|
||||
type connectInfo struct {
|
||||
@@ -611,8 +612,12 @@ func (c *client) processRouteInfo(info *Info) {
|
||||
if len(info.LeafNodeURLs) == 1 {
|
||||
c.route.leafnodeURL = info.LeafNodeURLs[0]
|
||||
}
|
||||
// Compute the hash of this route based on remoteID
|
||||
// Compute the hash of this route based on remote server name
|
||||
c.route.hash = string(getHash(info.Name))
|
||||
// Same with remote server ID (used for GW mapped replies routing).
|
||||
// Use getGWHash since we don't use the same hash len for that
|
||||
// for backward compatibility.
|
||||
c.route.idHash = string(getGWHash(info.ID))
|
||||
|
||||
// Copy over permissions as well.
|
||||
c.opts.Import = info.Import
|
||||
@@ -1457,11 +1462,12 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
|
||||
c.route.connectURLs = info.ClientConnectURLs
|
||||
c.route.wsConnURLs = info.WSConnectURLs
|
||||
cid := c.cid
|
||||
hash := string(c.route.hash)
|
||||
hash := c.route.hash
|
||||
idHash := c.route.idHash
|
||||
c.mu.Unlock()
|
||||
|
||||
// Store this route using the hash as the key
|
||||
s.routesByHash.Store(hash, c)
|
||||
s.storeRouteByHash(hash, idHash, c)
|
||||
|
||||
// Now that we have registered the route, we can remove from the temp map.
|
||||
s.removeFromTempClients(cid)
|
||||
@@ -1499,7 +1505,7 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
|
||||
// from our list.
|
||||
c.route.leafnodeURL = _EMPTY_
|
||||
// Same for the route hash otherwise it would be removed from s.routesByHash.
|
||||
c.route.hash = _EMPTY_
|
||||
c.route.hash, c.route.idHash = _EMPTY_, _EMPTY_
|
||||
c.mu.Unlock()
|
||||
|
||||
remote.mu.Lock()
|
||||
@@ -2022,6 +2028,7 @@ func (s *Server) removeRoute(c *client) {
|
||||
var lnURL string
|
||||
var gwURL string
|
||||
var hash string
|
||||
var idHash string
|
||||
c.mu.Lock()
|
||||
cid := c.cid
|
||||
r := c.route
|
||||
@@ -2029,6 +2036,7 @@ func (s *Server) removeRoute(c *client) {
|
||||
rID = r.remoteID
|
||||
lnURL = r.leafnodeURL
|
||||
hash = r.hash
|
||||
idHash = r.idHash
|
||||
gwURL = r.gatewayURL
|
||||
}
|
||||
c.mu.Unlock()
|
||||
@@ -2050,7 +2058,7 @@ func (s *Server) removeRoute(c *client) {
|
||||
if lnURL != _EMPTY_ && s.removeLeafNodeURL(lnURL) {
|
||||
s.sendAsyncLeafNodeInfo()
|
||||
}
|
||||
s.routesByHash.Delete(hash)
|
||||
s.removeRouteByHash(hash, idHash)
|
||||
}
|
||||
s.removeFromTempClients(cid)
|
||||
s.mu.Unlock()
|
||||
|
||||
@@ -124,7 +124,6 @@ type Server struct {
|
||||
clients map[uint64]*client
|
||||
routes map[uint64]*client
|
||||
routesByHash sync.Map
|
||||
hash []byte
|
||||
remotes map[string]*client
|
||||
leafs map[uint64]*client
|
||||
users map[string]*User
|
||||
|
||||
Reference in New Issue
Block a user