diff --git a/server/consumer.go b/server/consumer.go index 54bd3fd0..dae9cec7 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1556,7 +1556,7 @@ func (o *consumer) deleteNotActive() { } } - s, js := o.mset.srv, o.mset.srv.js + s, js := o.mset.srv, o.srv.js.Load() acc, stream, name, isDirect := o.acc.Name, o.stream, o.name, o.cfg.Direct o.mu.Unlock() diff --git a/server/events.go b/server/events.go index bb249d43..0f761a47 100644 --- a/server/events.go +++ b/server/events.go @@ -875,7 +875,7 @@ func (s *Server) sendStatsz(subj string) { m.Stats.ActiveServers = len(s.sys.servers) + 1 // JetStream - if js := s.js; js != nil { + if js := s.js.Load(); js != nil { jStat := &JetStreamVarz{} s.mu.RUnlock() js.mu.RLock() diff --git a/server/filestore.go b/server/filestore.go index 2e4a5378..909629eb 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -4729,29 +4729,30 @@ func (fs *fileStore) syncBlocks() { mb.mu.Unlock() continue } + // See if we can close FDs due to being idle. + if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle { + mb.dirtyCloseWithRemove(false) + } + // Check if we need to sync. We will not hold lock during actual sync. + var fn string if mb.needSync { // Flush anything that may be pending. if mb.pendingWriteSizeLocked() > 0 { mb.flushPendingMsgsLocked() } - if mb.mfd != nil { - mb.mfd.Sync() - } else { - fd, err := os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms) - if err != nil { - mb.mu.Unlock() - continue - } + fn = mb.mfn + mb.needSync = false + } + mb.mu.Unlock() + + // Check if we need to sync. + // This is done not holding any locks. + if fn != _EMPTY_ { + if fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms); fd != nil { fd.Sync() fd.Close() } - mb.needSync = false } - // See if we can close FDs due to being idle. - if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle { - mb.dirtyCloseWithRemove(false) - } - mb.mu.Unlock() } fs.mu.Lock() diff --git a/server/gateway.go b/server/gateway.go index 5c3b86f3..715a2c1d 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -1128,8 +1128,8 @@ func (c *client) processGatewayInfo(info *Info) { // connect events to switch those accounts into interest only mode. s.mu.Lock() s.ensureGWsInterestOnlyForLeafNodes() - js := s.js s.mu.Unlock() + js := s.js.Load() // If running in some tests, maintain the original behavior. if gwDoNotForceInterestOnlyMode && js != nil { diff --git a/server/jetstream.go b/server/jetstream.go index 04b7430a..756e75a5 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -117,9 +117,11 @@ type jetStream struct { // Some bools regarding general state. metaRecovering bool standAlone bool - disabled bool oos bool shuttingDown bool + + // Atomic versions + disabled atomic.Bool } type remoteUsage struct { @@ -372,9 +374,7 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error { } s.gcbMu.Unlock() - s.mu.Lock() - s.js = js - s.mu.Unlock() + s.js.Store(js) // FIXME(dlc) - Allow memory only operation? if stat, err := os.Stat(cfg.StoreDir); os.IsNotExist(err) { @@ -530,10 +530,7 @@ func (s *Server) setupJetStreamExports() { } func (s *Server) jetStreamOOSPending() (wasPending bool) { - s.mu.Lock() - js := s.js - s.mu.Unlock() - if js != nil { + if js := s.getJetStream(); js != nil { js.mu.Lock() wasPending = js.oos js.oos = true @@ -543,13 +540,8 @@ func (s *Server) jetStreamOOSPending() (wasPending bool) { } func (s *Server) setJetStreamDisabled() { - s.mu.Lock() - js := s.js - s.mu.Unlock() - if js != nil { - js.mu.Lock() - js.disabled = true - js.mu.Unlock() + if js := s.getJetStream(); js != nil { + js.disabled.Store(true) } } @@ -738,16 +730,15 @@ func (s *Server) configAllJetStreamAccounts() error { // a non-default system account. s.checkJetStreamExports() - // Snapshot into our own list. Might not be needed. - s.mu.Lock() // Bail if server not enabled. If it was enabled and a reload turns it off // that will be handled elsewhere. - js := s.js + js := s.getJetStream() if js == nil { - s.mu.Unlock() return nil } + // Snapshot into our own list. Might not be needed. + s.mu.RLock() if s.sys != nil { // clustered stream removal will perform this cleanup as well // this is mainly for initial cleanup @@ -764,12 +755,12 @@ func (s *Server) configAllJetStreamAccounts() error { } var jsAccounts []*Account - s.accounts.Range(func(k, v interface{}) bool { + s.accounts.Range(func(k, v any) bool { jsAccounts = append(jsAccounts, v.(*Account)) return true }) accounts := &s.accounts - s.mu.Unlock() + s.mu.RUnlock() // Process any jetstream enabled accounts here. These will be accounts we are // already aware of at startup etc. @@ -809,9 +800,7 @@ func (js *jetStream) isEnabled() bool { if js == nil { return false } - js.mu.RLock() - defer js.mu.RUnlock() - return !js.disabled + return !js.disabled.Load() } // Mark that we will be in standlone mode. @@ -821,9 +810,9 @@ func (js *jetStream) setJetStreamStandAlone(isStandAlone bool) { } js.mu.Lock() defer js.mu.Unlock() - js.standAlone = isStandAlone - - if isStandAlone { + if js.standAlone = isStandAlone; js.standAlone { + // Update our server atomic. + js.srv.isMetaLeader.Store(true) js.accountPurge, _ = js.srv.systemSubscribe(JSApiAccountPurge, _EMPTY_, false, nil, js.srv.jsLeaderAccountPurgeRequest) } else if js.accountPurge != nil { js.srv.sysUnsubscribe(js.accountPurge) @@ -832,11 +821,7 @@ func (js *jetStream) setJetStreamStandAlone(isStandAlone bool) { // JetStreamEnabled reports if jetstream is enabled for this server. func (s *Server) JetStreamEnabled() bool { - var js *jetStream - s.mu.RLock() - js = s.js - s.mu.RUnlock() - return js.isEnabled() + return s.getJetStream().isEnabled() } // JetStreamEnabledForDomain will report if any servers have JetStream enabled within this domain. @@ -909,10 +894,7 @@ func (js *jetStream) isShuttingDown() bool { // Shutdown jetstream for this server. func (s *Server) shutdownJetStream() { - s.mu.RLock() - js := s.js - s.mu.RUnlock() - + js := s.getJetStream() if js == nil { return } @@ -951,9 +933,7 @@ func (s *Server) shutdownJetStream() { a.removeJetStream() } - s.mu.Lock() - s.js = nil - s.mu.Unlock() + s.js.Store(nil) js.mu.Lock() js.accounts = nil @@ -994,23 +974,20 @@ func (s *Server) shutdownJetStream() { // created a dynamic configuration. A copy is returned. func (s *Server) JetStreamConfig() *JetStreamConfig { var c *JetStreamConfig - s.mu.Lock() - if s.js != nil { - copy := s.js.config + if js := s.getJetStream(); js != nil { + copy := js.config c = &(copy) } - s.mu.Unlock() return c } // StoreDir returns the current JetStream directory. func (s *Server) StoreDir() string { - s.mu.Lock() - defer s.mu.Unlock() - if s.js == nil { + js := s.getJetStream() + if js == nil { return _EMPTY_ } - return s.js.config.StoreDir + return js.config.StoreDir } // JetStreamNumAccounts returns the number of enabled accounts this server is tracking. @@ -1036,10 +1013,7 @@ func (s *Server) JetStreamReservedResources() (int64, int64, error) { } func (s *Server) getJetStream() *jetStream { - s.mu.RLock() - js := s.js - s.mu.RUnlock() - return js + return s.js.Load() } func (a *Account) assignJetStreamLimits(limits map[string]JetStreamAccountLimits) { diff --git a/server/jetstream_api.go b/server/jetstream_api.go index dfade67d..d930298b 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2314,14 +2314,15 @@ func (s *Server) peerSetToNames(ps []string) []string { // looks up the peer id for a given server name. Cluster and domain name are optional filter criteria func (s *Server) nameToPeer(js *jetStream, serverName, clusterName, domainName string) string { js.mu.RLock() - cc := js.cluster defer js.mu.RUnlock() - for _, p := range cc.meta.Peers() { - si, ok := s.nodeToInfo.Load(p.ID) - if ok && si.(nodeInfo).name == serverName { - if clusterName == _EMPTY_ || clusterName == si.(nodeInfo).cluster { - if domainName == _EMPTY_ || domainName == si.(nodeInfo).domain { - return p.ID + if cc := js.cluster; cc != nil { + for _, p := range cc.meta.Peers() { + si, ok := s.nodeToInfo.Load(p.ID) + if ok && si.(nodeInfo).name == serverName { + if clusterName == _EMPTY_ || clusterName == si.(nodeInfo).cluster { + if domainName == _EMPTY_ || domainName == si.(nodeInfo).domain { + return p.ID + } } } } @@ -4217,11 +4218,11 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, } // We have a consumer assignment. js.mu.RLock() - - var node RaftNode - var leaderNotPartOfGroup bool - var isMember bool - + var ( + node RaftNode + leaderNotPartOfGroup bool + isMember bool + ) rg := ca.Group if rg != nil && rg.isMember(ourID) { isMember = true @@ -4233,6 +4234,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, } } js.mu.RUnlock() + // Check if we should ignore all together. if node == nil { // We have been assigned but have not created a node yet. If we are a member return diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index b5c02bd5..92e70501 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -202,10 +202,7 @@ func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) { return nil, nil } - s.mu.RLock() - js := s.js - s.mu.RUnlock() - + js := s.getJetStream() if js == nil { return nil, nil } @@ -223,13 +220,7 @@ func (s *Server) JetStreamIsClustered() bool { } func (s *Server) JetStreamIsLeader() bool { - js := s.getJetStream() - if js == nil { - return false - } - js.mu.RLock() - defer js.mu.RUnlock() - return js.cluster.isLeader() + return s.isMetaLeader.Load() } func (s *Server) JetStreamIsCurrent() bool { @@ -237,9 +228,20 @@ func (s *Server) JetStreamIsCurrent() bool { if js == nil { return false } + // Grab what we need and release js lock. js.mu.RLock() - defer js.mu.RUnlock() - return js.cluster.isCurrent() + var meta RaftNode + cc := js.cluster + if cc != nil { + meta = cc.meta + } + js.mu.RUnlock() + + if cc == nil { + // Non-clustered mode + return true + } + return meta.Current() } func (s *Server) JetStreamSnapshotMeta() error { @@ -385,19 +387,6 @@ func (cc *jetStreamCluster) isLeader() bool { return cc.meta != nil && cc.meta.Leader() } -// isCurrent will determine if this node is a leader or an up to date follower. -// Read lock should be held. -func (cc *jetStreamCluster) isCurrent() bool { - if cc == nil { - // Non-clustered mode - return true - } - if cc.meta == nil { - return false - } - return cc.meta.Current() -} - // isStreamCurrent will determine if the stream is up to date. // For R1 it will make sure the stream is present on this server. // Read lock should be held. @@ -647,9 +636,8 @@ func (a *Account) getJetStreamFromAccount() (*Server, *jetStream, *jsAccount) { if js == nil { return nil, nil, nil } - js.mu.RLock() + // Lock not needed, set on creation. s := js.srv - js.mu.RUnlock() return s, js, jsa } @@ -860,10 +848,8 @@ func (js *jetStream) getMetaGroup() RaftNode { } func (js *jetStream) server() *Server { - js.mu.RLock() - s := js.srv - js.mu.RUnlock() - return s + // Lock not needed, only set once on creation. + return js.srv } // Will respond if we do not think we have a metacontroller leader. @@ -1241,6 +1227,7 @@ func (js *jetStream) monitorCluster() { // Make sure to stop the raft group on exit to prevent accidental memory bloat. defer n.Stop() + defer s.isMetaLeader.Store(false) const compactInterval = time.Minute t := time.NewTicker(compactInterval) @@ -1728,6 +1715,11 @@ func (js *jetStream) processAddPeer(peer string) { } func (js *jetStream) processRemovePeer(peer string) { + // We may be already disabled. + if js == nil || js.disabled.Load() { + return + } + js.mu.Lock() s, cc := js.srv, js.cluster if cc == nil || cc.meta == nil { @@ -1737,14 +1729,8 @@ func (js *jetStream) processRemovePeer(peer string) { isLeader := cc.isLeader() // All nodes will check if this is them. isUs := cc.meta.ID() == peer - disabled := js.disabled js.mu.Unlock() - // We may be already disabled. - if disabled { - return - } - if isUs { s.Errorf("JetStream being DISABLED, our server was removed from the cluster") adv := &JSServerRemovedAdvisory{ @@ -5282,21 +5268,31 @@ func (js *jetStream) stopUpdatesSub() { } func (js *jetStream) processLeaderChange(isLeader bool) { + if js == nil { + return + } + s := js.srv + if s == nil { + return + } + // Update our server atomic. + s.isMetaLeader.Store(isLeader) + if isLeader { - js.srv.Noticef("Self is new JetStream cluster metadata leader") + s.Noticef("Self is new JetStream cluster metadata leader") } else { var node string if meta := js.getMetaGroup(); meta != nil { node = meta.GroupLeader() } if node == _EMPTY_ { - js.srv.Noticef("JetStream cluster no metadata leader") + s.Noticef("JetStream cluster no metadata leader") } else if srv := js.srv.serverNameForNode(node); srv == _EMPTY_ { - js.srv.Noticef("JetStream cluster new remote metadata leader") + s.Noticef("JetStream cluster new remote metadata leader") } else if clst := js.srv.clusterNameForNode(node); clst == _EMPTY_ { - js.srv.Noticef("JetStream cluster new metadata leader: %s", srv) + s.Noticef("JetStream cluster new metadata leader: %s", srv) } else { - js.srv.Noticef("JetStream cluster new metadata leader: %s/%s", srv, clst) + s.Noticef("JetStream cluster new metadata leader: %s/%s", srv, clst) } } @@ -5317,7 +5313,7 @@ func (js *jetStream) processLeaderChange(isLeader bool) { for acc, asa := range cc.streams { for _, sa := range asa { if sa.Sync == _EMPTY_ { - js.srv.Warnf("Stream assigment corrupt for stream '%s > %s'", acc, sa.Config.Name) + s.Warnf("Stream assigment corrupt for stream '%s > %s'", acc, sa.Config.Name) nsa := &streamAssignment{Group: sa.Group, Config: sa.Config, Subject: sa.Subject, Reply: sa.Reply, Client: sa.Client} nsa.Sync = syncSubjForStream() cc.meta.Propose(encodeUpdateStreamAssignment(nsa)) @@ -8141,19 +8137,18 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo { s := js.srv if rg == nil || rg.node == nil { return &ClusterInfo{ - Name: s.ClusterName(), + Name: s.cachedClusterName(), Leader: s.Name(), } } - n := rg.node + n := rg.node ci := &ClusterInfo{ - Name: s.ClusterName(), + Name: s.cachedClusterName(), Leader: s.serverNameForNode(n.GroupLeader()), } now := time.Now() - id, peers := n.ID(), n.Peers() // If we are leaderless, do not suppress putting us in the peer list. @@ -8274,7 +8269,7 @@ func (mset *stream) handleClusterStreamInfoRequest(_ *subscription, c *client, _ func (mset *stream) processClusterStreamInfoRequest(reply string) { mset.mu.RLock() - sysc, js, sa, config := mset.sysc, mset.srv.js, mset.sa, mset.cfg + sysc, js, sa, config := mset.sysc, mset.srv.js.Load(), mset.sa, mset.cfg isLeader := mset.isLeader() mset.mu.RUnlock() diff --git a/server/monitor.go b/server/monitor.go index 7a01cfe2..66f5e81a 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1465,14 +1465,14 @@ func (s *Server) Varz(varzOpts *VarzOptions) (*Varz, error) { // We want to do that outside of the lock. pse.ProcUsage(&pcpu, &rss, &vss) - s.mu.Lock() - js := s.js + s.mu.RLock() // We need to create a new instance of Varz (with no reference // whatsoever to anything stored in the server) since the user // has access to the returned value. v := s.createVarz(pcpu, rss) - s.mu.Unlock() - if js != nil { + s.mu.RUnlock() + + if js := s.getJetStream(); js != nil { s.updateJszVarz(js, &v.JetStream, true) } @@ -1798,7 +1798,6 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { // Use server lock to create/update the server's varz object. s.mu.Lock() var created bool - js := s.js s.httpReqStats[VarzPath]++ if s.varz == nil { s.varz = s.createVarz(pcpu, rss) @@ -1809,19 +1808,20 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { s.mu.Unlock() // Since locking is jetStream -> Server, need to update jetstream // varz outside of server lock. - if js != nil { + + if js := s.getJetStream(); js != nil { var v JetStreamVarz // Work on stack variable s.updateJszVarz(js, &v, created) // Now update server's varz - s.mu.Lock() + s.mu.RLock() sv := &s.varz.JetStream if created { sv.Config = v.Config } sv.Stats = v.Stats sv.Meta = v.Meta - s.mu.Unlock() + s.mu.RUnlock() } // Do the marshaling outside of server lock, but under varzMu lock. @@ -2835,10 +2835,10 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, } jsa.mu.RUnlock() - if optStreams { + if js := s.getJetStream(); js != nil && optStreams { for _, stream := range streams { rgroup := stream.raftGroup() - ci := s.js.clusterInfo(rgroup) + ci := js.clusterInfo(rgroup) var cfg *StreamConfig if optCfg { c := stream.config() @@ -2884,7 +2884,8 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, } func (s *Server) JszAccount(opts *JSzOptions) (*AccountDetail, error) { - if s.js == nil { + js := s.getJetStream() + if js == nil { return nil, fmt.Errorf("jetstream not enabled") } acc := opts.Account @@ -2892,9 +2893,9 @@ func (s *Server) JszAccount(opts *JSzOptions) (*AccountDetail, error) { if !ok { return nil, fmt.Errorf("account %q not found", acc) } - s.js.mu.RLock() - jsa, ok := s.js.accounts[account.(*Account).Name] - s.js.mu.RUnlock() + js.mu.RLock() + jsa, ok := js.accounts[account.(*Account).Name] + js.mu.RUnlock() if !ok { return nil, fmt.Errorf("account %q not jetstream enabled", acc) } @@ -2916,7 +2917,7 @@ func (s *Server) raftNodeToClusterInfo(node RaftNode) *ClusterInfo { Peers: peerList, node: node, } - return s.js.clusterInfo(group) + return s.getJetStream().clusterInfo(group) } // Jsz returns a Jsz structure containing information about JetStream. diff --git a/server/raft.go b/server/raft.go index f7a05353..5bcc2a1a 100644 --- a/server/raft.go +++ b/server/raft.go @@ -134,6 +134,7 @@ type raft struct { track bool werr error state RaftState + isLeader atomic.Bool hh hash.Hash64 snapfile string csz int @@ -1158,14 +1159,12 @@ func (n *raft) loadLastSnapshot() (*snapshot, error) { } // Leader returns if we are the leader for our group. +// We use an atomic here now vs acquiring the read lock. func (n *raft) Leader() bool { if n == nil { return false } - n.RLock() - isLeader := n.state == Leader - n.RUnlock() - return isLeader + return n.isLeader.Load() } func (n *raft) isCatchingUp() bool { @@ -1688,8 +1687,7 @@ func (n *raft) run() { // We want to wait for some routing to be enabled, so we will wait for // at least a route, leaf or gateway connection to be established before // starting the run loop. - gw := s.gateway - for { + for gw := s.gateway; ; { s.mu.Lock() ready := s.numRemotes()+len(s.leafs) > 0 if !ready && gw.enabled { @@ -3831,6 +3829,9 @@ func (n *raft) quorumNeeded() int { // Lock should be held. func (n *raft) updateLeadChange(isLeader bool) { + // Update our atomic about being the leader. + n.isLeader.Store(isLeader) + // We don't care about values that have not been consumed (transitory states), // so we dequeue any state that is pending and push the new one. for { diff --git a/server/reload.go b/server/reload.go index 516b52d3..23988171 100644 --- a/server/reload.go +++ b/server/reload.go @@ -1873,7 +1873,7 @@ func (s *Server) reloadAuthorization() { awcsti, _ = s.configureAccounts(true) s.configureAuthorization() // Double check any JetStream configs. - checkJetStream = s.js != nil + checkJetStream = s.getJetStream() != nil } else if opts.AccountResolver != nil { s.configureResolver() if _, ok := s.accResolver.(*MemAccResolver); ok { diff --git a/server/server.go b/server/server.go index 7ea3000a..d1d0d109 100644 --- a/server/server.go +++ b/server/server.go @@ -137,7 +137,8 @@ type Server struct { listenerErr error gacc *Account sys *internal - js *jetStream + js atomic.Pointer[jetStream] + isMetaLeader atomic.Bool accounts sync.Map tmpAccounts sync.Map // Temporarily stores accounts that are being built activeAccounts int32