diff --git a/server/consumer.go b/server/consumer.go index a70518ce..60af9a4a 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3857,7 +3857,7 @@ func (o *consumer) String() string { } func createConsumerName() string { - return string(getHash(nuid.Next())) + return getHash(nuid.Next()) } // deleteConsumer will delete the consumer from this stream. diff --git a/server/events.go b/server/events.go index 1c562368..cdad49c1 100644 --- a/server/events.go +++ b/server/events.go @@ -16,6 +16,7 @@ package server import ( "bytes" "compress/gzip" + "crypto/sha256" "encoding/json" "errors" "fmt" @@ -694,7 +695,7 @@ func (s *Server) sendStatsz(subj string) { js.mu.RUnlock() jStat.Stats = js.usageStats() // Update our own usage since we do not echo so we will not hear ourselves. - ourNode := string(getHash(s.serverName())) + ourNode := getHash(s.serverName()) if v, ok := s.nodeToInfo.Load(ourNode); ok && v != nil { ni := v.(nodeInfo) ni.stats = jStat.Stats @@ -708,13 +709,21 @@ func (s *Server) sendStatsz(subj string) { if mg := js.getMetaGroup(); mg != nil { if mg.Leader() { if ci := s.raftNodeToClusterInfo(mg); ci != nil { - jStat.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Replicas: ci.Replicas, Size: mg.ClusterSize()} + jStat.Meta = &MetaClusterInfo{ + Name: ci.Name, + Leader: ci.Leader, + Peer: getHash(ci.Leader), + Replicas: ci.Replicas, + Size: mg.ClusterSize(), + } } } else { // non leader only include a shortened version without peers + leader := s.serverNameForNode(mg.GroupLeader()) jStat.Meta = &MetaClusterInfo{ Name: mg.Group(), - Leader: s.serverNameForNode(mg.GroupLeader()), + Leader: leader, + Peer: getHash(leader), Size: mg.ClusterSize(), } } @@ -766,10 +775,41 @@ func (s *Server) startRemoteServerSweepTimer() { const sysHashLen = 8 // Computes a hash of 8 characters for the name. -func getHash(name string) []byte { +func getHash(name string) string { return getHashSize(name, sysHashLen) } +var nameToHashSize8 = sync.Map{} +var nameToHashSize6 = sync.Map{} + +// Computes a hash for the given `name`. The result will be `size` characters long. +func getHashSize(name string, size int) string { + compute := func() string { + sha := sha256.New() + sha.Write([]byte(name)) + b := sha.Sum(nil) + for i := 0; i < size; i++ { + b[i] = digits[int(b[i]%base)] + } + return string(b[:size]) + } + var m *sync.Map + switch size { + case 8: + m = &nameToHashSize8 + case 6: + m = &nameToHashSize6 + default: + return compute() + } + if v, ok := m.Load(name); ok { + return v.(string) + } + h := compute() + m.Store(name, h) + return h +} + // Returns the node name for this server which is a hash of the server name. func (s *Server) Node() string { s.mu.RLock() @@ -791,7 +831,7 @@ func (s *Server) initEventTracking() { return } // Create a system hash which we use for other servers to target us specifically. - s.sys.shash = string(getHash(s.info.Name)) + s.sys.shash = getHash(s.info.Name) // This will be for all inbox responses. subject := fmt.Sprintf(inboxRespSubj, s.sys.shash, "*") @@ -1162,7 +1202,7 @@ func (s *Server) remoteServerShutdown(sub *subscription, c *client, _ *Account, } // JetStream node updates if applicable. - node := string(getHash(si.Name)) + node := getHash(si.Name) if v, ok := s.nodeToInfo.Load(node); ok && v != nil { ni := v.(nodeInfo) ni.offline = true @@ -1200,7 +1240,7 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su stats = ssm.Stats.JetStream.Stats } - node := string(getHash(si.Name)) + node := getHash(si.Name) s.nodeToInfo.Store(node, nodeInfo{ si.Name, si.Version, @@ -1248,7 +1288,7 @@ func (s *Server) processNewServer(si *ServerInfo) { // Add to our nodeToName if s.sameDomain(si.Domain) { - node := string(getHash(si.Name)) + node := getHash(si.Name) // Only update if non-existent if _, ok := s.nodeToInfo.Load(node); !ok { s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Version, si.Cluster, si.Domain, si.ID, si.Tags, nil, nil, false, si.JetStream}) diff --git a/server/events_test.go b/server/events_test.go index e5ef65b4..605e687f 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -15,8 +15,10 @@ package server import ( "bytes" + "crypto/sha256" "encoding/json" "fmt" + "math/rand" "net/http" "net/http/httptest" "os" @@ -29,6 +31,7 @@ import ( "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" + "github.com/nats-io/nuid" ) func createAccount(s *Server) (*Account, nkeys.KeyPair) { @@ -2498,3 +2501,44 @@ func TestServerEventsAndDQSubscribers(t *testing.T) { checkSubsPending(t, sub, 10) } + +func Benchmark_GetHash(b *testing.B) { + b.StopTimer() + // Get 100 random names + names := make([]string, 0, 100) + for i := 0; i < 100; i++ { + names = append(names, nuid.Next()) + } + hashes := make([]string, 0, 100) + for j := 0; j < 100; j++ { + sha := sha256.New() + sha.Write([]byte(names[j])) + b := sha.Sum(nil) + for i := 0; i < 8; i++ { + b[i] = digits[int(b[i]%base)] + } + hashes = append(hashes, string(b[:8])) + } + wg := sync.WaitGroup{} + wg.Add(8) + errCh := make(chan error, 8) + b.StartTimer() + for i := 0; i < 8; i++ { + go func() { + defer wg.Done() + for i := 0; i < b.N; i++ { + idx := rand.Intn(100) + if h := getHash(names[idx]); h != hashes[idx] { + errCh <- fmt.Errorf("Hash for name %q was %q, but should be %q", names[idx], h, hashes[idx]) + return + } + } + }() + } + wg.Wait() + select { + case err := <-errCh: + b.Fatal(err.Error()) + default: + } +} diff --git a/server/gateway.go b/server/gateway.go index 1fb0ce9b..f993832f 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -320,21 +320,10 @@ func validateGatewayOptions(o *Options) error { return nil } -// Computes a hash for the given `name`. The result will be `size` characters long. -func getHashSize(name string, size int) []byte { - sha := sha256.New() - sha.Write([]byte(name)) - b := sha.Sum(nil) - for i := 0; i < size; i++ { - b[i] = digits[int(b[i]%base)] - } - return b[:size] -} - // Computes a hash of 6 characters for the name. // This will be used for routing of replies. func getGWHash(name string) []byte { - return getHashSize(name, gwHashLen) + return []byte(getHashSize(name, gwHashLen)) } func getOldHash(name string) []byte { diff --git a/server/jetstream_api.go b/server/jetstream_api.go index d4905a84..caf34ef7 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2149,7 +2149,7 @@ func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Acco // Check to see if we are a member of the group and if the group has no leader. // Peers here is a server name, convert to node name. - nodeName := string(getHash(req.Peer)) + nodeName := getHash(req.Peer) js.mu.RLock() rg := sa.Group diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 94b928a9..83791437 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4846,7 +4846,7 @@ func groupNameForConsumer(peers []string, storage StorageType) string { } func groupName(prefix string, peers []string, storage StorageType) string { - gns := string(getHash(nuid.Next())) + gns := getHash(nuid.Next()) return fmt.Sprintf("%s-R%d%s-%s", prefix, len(peers), storage.String()[:1], gns) } @@ -5245,7 +5245,7 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su if si, err := s.sysRequest(&StreamInfo{}, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil { s.Warnf("Did not receive stream info results for '%s > %s' due to: %s", acc, cfg.Name, err) } else if cl := si.(*StreamInfo).Cluster; cl != nil { - curLeader = string(getHash(cl.Leader)) + curLeader = getHash(cl.Leader) } // Re-acquire here. js.mu.Lock() @@ -6137,7 +6137,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec if ci, err := s.sysRequest(&ConsumerInfo{}, clusterConsumerInfoT, ci.serviceAccount(), sa.Config.Name, cfg.Durable); err != nil { s.Warnf("Did not receive consumer info results for '%s > %s > %s' due to: %s", acc, sa.Config.Name, cfg.Durable, err) } else if cl := ci.(*ConsumerInfo).Cluster; cl != nil { - curLeader = string(getHash(cl.Leader)) + curLeader = getHash(cl.Leader) } // Re-acquire here. js.mu.Lock() @@ -7174,7 +7174,7 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo { func (mset *stream) checkClusterInfo(ci *ClusterInfo) { for _, r := range ci.Replicas { - peer := string(getHash(r.Name)) + peer := getHash(r.Name) if lag := mset.lagForCatchupPeer(peer); lag > 0 { r.Current = false r.Lag = lag diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 1721cfdd..283388c3 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -2770,9 +2770,9 @@ func TestJetStreamSuperClusterMoveCancel(t *testing.T) { } expectedPeers := map[string]struct{}{ - string(getHash(streamPeerSrv[0])): {}, - string(getHash(streamPeerSrv[1])): {}, - string(getHash(streamPeerSrv[2])): {}, + getHash(streamPeerSrv[0]): {}, + getHash(streamPeerSrv[1]): {}, + getHash(streamPeerSrv[2]): {}, } _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "DUR", AckPolicy: nats.AckExplicitPolicy}) @@ -2981,7 +2981,7 @@ func TestJetStreamSuperClusterDoubleStreamMove(t *testing.T) { } // test list order invariant js, cc := s.getJetStreamCluster() - sExpHash := string(getHash(sExpected)) + sExpHash := getHash(sExpected) js.mu.Lock() if sa, ok := cc.streams["$G"]["TEST"]; !ok { js.mu.Unlock() diff --git a/server/monitor.go b/server/monitor.go index 6337634e..355b11e6 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1403,7 +1403,7 @@ func (s *Server) updateJszVarz(js *jetStream, v *JetStreamVarz, doConfig bool) { v.Stats = js.usageStats() if mg := js.getMetaGroup(); mg != nil { if ci := s.raftNodeToClusterInfo(mg); ci != nil { - v.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Size: mg.ClusterSize()} + v.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Peer: getHash(ci.Leader), Size: mg.ClusterSize()} if ci.Leader == s.info.Name { v.Meta.Replicas = ci.Replicas } @@ -2656,6 +2656,7 @@ type AccountDetail struct { type MetaClusterInfo struct { Name string `json:"name,omitempty"` Leader string `json:"leader,omitempty"` + Peer string `json:"peer,omitempty"` Replicas []*PeerInfo `json:"replicas,omitempty"` Size int `json:"cluster_size"` } @@ -2835,7 +2836,7 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) { if mg := js.getMetaGroup(); mg != nil { if ci := s.raftNodeToClusterInfo(mg); ci != nil { - jsi.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Size: mg.ClusterSize()} + jsi.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Peer: getHash(ci.Leader), Size: mg.ClusterSize()} if isLeader { jsi.Meta.Replicas = ci.Replicas } diff --git a/server/monitor_test.go b/server/monitor_test.go index b8eed2a3..ecf861f9 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -4363,6 +4363,9 @@ func TestMonitorJsz(t *testing.T) { found := 0 for i, url := range []string{monUrl1, monUrl2} { info := readJsInfo(url + "") + if info.Meta.Peer != getHash(info.Meta.Leader) { + t.Fatalf("Invalid Peer: %+v", info.Meta) + } if info.Meta.Replicas != nil { found++ for _, r := range info.Meta.Replicas { diff --git a/server/mqtt.go b/server/mqtt.go index f4d733c0..8d36130c 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -926,7 +926,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc c := s.createInternalAccountClient() c.acc = acc - id := string(getHash(s.Name())) + id := getHash(s.Name()) replicas := opts.MQTT.StreamReplicas if replicas <= 0 { replicas = s.mqttDetermineReplicas() @@ -2110,7 +2110,7 @@ func (as *mqttAccountSessionManager) createOrRestoreSession(clientID string, opt return nil, false, fmt.Errorf("%s for account %q, session %q: %v", errTxt, accName, clientID, err) } - hash := string(getHash(clientID)) + hash := getHash(clientID) subject := mqttSessStreamSubjectPrefix + as.domainTk + hash smsg, err := jsa.loadLastMsgFor(mqttSessStreamName, subject) if err != nil { @@ -2208,7 +2208,7 @@ func (as *mqttAccountSessionManager) transferUniqueSessStreamsToMuxed(log *Serve continue } // Compute subject where the session is being stored - subject := mqttSessStreamSubjectPrefix + as.domainTk + string(getHash(ps.ID)) + subject := mqttSessStreamSubjectPrefix + as.domainTk + getHash(ps.ID) // Store record to MQTT session stream if _, err := jsa.storeMsgWithKind(mqttJSASessPersist, subject, 0, smsg.Data); err != nil { log.Errorf(" Unable to transfer the session record: %v", err) diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 779a3e49..acfa516e 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -5290,7 +5290,7 @@ func TestMQTTTransferSessionStreamsToMuxed(t *testing.T) { // Create 2 streams that start with "$MQTT_sess_" to check for transfer to new // mux'ed unique "$MQTT_sess" stream. One of this stream will not contain a // proper session record, and we will check that the stream does not get deleted. - sessStreamName1 := mqttSessionsStreamNamePrefix + string(getHash("sub")) + sessStreamName1 := mqttSessionsStreamNamePrefix + getHash("sub") if _, err := js.AddStream(&nats.StreamConfig{ Name: sessStreamName1, Subjects: []string{sessStreamName1}, diff --git a/server/norace_test.go b/server/norace_test.go index 49a6ef83..b8766e47 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -3413,7 +3413,7 @@ func TestNoRaceJetStreamLastSubjSeqAndFilestoreCompact(t *testing.T) { secondPayload := make([]byte, 380) for iter := 0; iter < 2; iter++ { for i := 0; i < 4000; i++ { - subj := "MQTT.sess." + string(getHash(fmt.Sprintf("client_%d", i))) + subj := "MQTT.sess." + getHash(fmt.Sprintf("client_%d", i)) pa, err := js.Publish(subj, firstPayload) if err != nil { t.Fatalf("Error on publish: %v", err) diff --git a/server/route.go b/server/route.go index e8847118..0ed56a48 100644 --- a/server/route.go +++ b/server/route.go @@ -609,7 +609,7 @@ func (c *client) processRouteInfo(info *Info) { c.route.leafnodeURL = info.LeafNodeURLs[0] } // Compute the hash of this route based on remote server name - c.route.hash = string(getHash(info.Name)) + c.route.hash = getHash(info.Name) // Same with remote server ID (used for GW mapped replies routing). // Use getGWHash since we don't use the same hash len for that // for backward compatibility. diff --git a/server/server.go b/server/server.go index 97cba560..35f17d5a 100644 --- a/server/server.go +++ b/server/server.go @@ -426,7 +426,7 @@ func NewServer(opts *Options) (*Server, error) { // Place ourselves in the JetStream nodeInfo if needed. if opts.JetStream { - ourNode := string(getHash(serverName)) + ourNode := getHash(serverName) s.nodeToInfo.Store(ourNode, nodeInfo{ serverName, VERSION, @@ -3057,7 +3057,7 @@ func (s *Server) ID() string { // NodeName returns the node name for this server. func (s *Server) NodeName() string { - return string(getHash(s.info.Name)) + return getHash(s.info.Name) } // Name returns the server's name. This will be the same as the ID if it was not set. diff --git a/server/stream.go b/server/stream.go index 7959aff9..d03892b8 100644 --- a/server/stream.go +++ b/server/stream.go @@ -555,7 +555,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt // use additional information in case the stream names are the same. func (ssi *StreamSource) setIndexName() { if ssi.External != nil { - ssi.iname = ssi.Name + ":" + string(getHash(ssi.External.ApiPrefix)) + ssi.iname = ssi.Name + ":" + getHash(ssi.External.ApiPrefix) } else { ssi.iname = ssi.Name }