diff --git a/server/accounts.go b/server/accounts.go index 1a865ff2..399a5468 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -2313,7 +2313,9 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp // Always grab time and make sure response threshold timer is running. si.ts = time.Now().UnixNano() - osi.se.setResponseThresholdTimer() + if osi.se != nil { + osi.se.setResponseThresholdTimer() + } if rt == Singleton && tracking { si.latency = osi.latency @@ -3922,7 +3924,10 @@ func (dr *DirAccResolver) Start(s *Server) error { return fmt.Errorf("error setting up update handling: %v", err) } } - if _, err := s.sysSubscribe(accClaimsReqSubj, func(_ *subscription, _ *client, _ *Account, subj, resp string, msg []byte) { + if _, err := s.sysSubscribe(accClaimsReqSubj, func(_ *subscription, c *client, _ *Account, _, resp string, msg []byte) { + // As this is a raw message, we need to extract payload and only decode claims from it, + // in case request is sent with headers. + _, msg = c.msgParts(msg) if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil { respondToUpdate(s, resp, "n/a", "jwt update resulted in error", err) } else if claim.Issuer == op && strict { @@ -4210,7 +4215,10 @@ func (dr *CacheDirAccResolver) Start(s *Server) error { return fmt.Errorf("error setting up update handling: %v", err) } } - if _, err := s.sysSubscribe(accClaimsReqSubj, func(_ *subscription, _ *client, _ *Account, subj, resp string, msg []byte) { + if _, err := s.sysSubscribe(accClaimsReqSubj, func(_ *subscription, c *client, _ *Account, _, resp string, msg []byte) { + // As this is a raw message, we need to extract payload and only decode claims from it, + // in case request is sent with headers. + _, msg = c.msgParts(msg) if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil { respondToUpdate(s, resp, "n/a", "jwt update cache resulted in error", err) } else if claim.Issuer == op && strict { diff --git a/server/consumer.go b/server/consumer.go index d120d4c5..775ea444 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1198,8 +1198,9 @@ func (o *consumer) setLeader(isLeader bool) { } } +// This is coming on the wire so do not block here. func (o *consumer) handleClusterConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { - o.infoWithSnapAndReply(false, reply) + go o.infoWithSnapAndReply(false, reply) } // Lock should be held. @@ -1438,7 +1439,7 @@ func (o *consumer) deleteNotActive() { // Check to make sure we went away. // Don't think this needs to be a monitored go routine. go func() { - ticker := time.NewTicker(time.Second) + ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for range ticker.C { js.mu.RLock() @@ -2193,12 +2194,9 @@ func (o *consumer) checkRedelivered(slseq uint64) { } } if shouldUpdateState { - if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil { + if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil && !o.closed { s, acc, mset, name := o.srv, o.acc, o.mset, o.name - // Can not hold lock while gather information about account and stream below. - o.mu.Unlock() - s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.name(), name, err) - o.mu.Lock() + s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.cfg.Name, name, err) } } } @@ -3151,26 +3149,41 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { } o.sseq++ return pmsg, 1, err - } + // Hold onto this since we release the lock. + store := o.mset.store + + // Grab next message applicable to us. + // We will unlock here in case lots of contention, e.g. WQ. + o.mu.Unlock() + pmsg := getJSPubMsgFromPool() + sm, sseq, err := store.LoadNextMsg(filter, filterWC, seq, &pmsg.StoreMsg) + if sm == nil { + pmsg.returnToPool() + pmsg, dc = nil, 0 + } + o.mu.Lock() + // If no filters are specified, optimize to fetch just non-filtered messages. if o.subjf == nil { // Grab next message applicable to us. + // We will unlock here in case lots of contention, e.g. WQ. + o.mu.Unlock() pmsg := getJSPubMsgFromPool() - sm, sseq, err := o.mset.store.LoadNextMsg("", false, o.sseq, &pmsg.StoreMsg) - + sm, sseq, err := store.LoadNextMsg(_EMPTY_, false, o.sseq, &pmsg.StoreMsg) + if sm == nil { + pmsg.returnToPool() + pmsg, dc = nil, 0 + } + o.mu.Lock() if sseq >= o.sseq { o.sseq = sseq + 1 if err == ErrStoreEOF { o.updateSkipped(o.sseq) } } - if sm == nil { - pmsg.returnToPool() - return nil, 0, err - } - return pmsg, 1, err + return pmsg, dc, err } var lastErr error @@ -3183,8 +3196,12 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { } // if this subject didn't fetch any message before, do it now if filter.pmsg == nil { + // We will unlock here in case lots of contention, e.g. WQ. + filterSubject, filterWC, nextSeq := filter.subject, filter.hasWildcard, filter.nextSeq + o.mu.Unlock() pmsg := getJSPubMsgFromPool() - sm, sseq, err := o.mset.store.LoadNextMsg(filter.subject, filter.hasWildcard, filter.nextSeq, &pmsg.StoreMsg) + sm, sseq, err := store.LoadNextMsg(filterSubject, filterWC, filter.nextSeq, &pmsg.StoreMsg) + o.mu.Lock() filter.err = err @@ -3203,7 +3220,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { } - // Don't sosrt the o.subjf if it's only one entry + // Don't sort the o.subjf if it's only one entry // Sort uses `reflect` and can noticeably slow down fetching, // even if len == 0 or 1. // TODO(tp): we should have sort based off generics for server @@ -3235,7 +3252,6 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { } return nil, 0, lastErr - } // Will check for expiration and lack of interest on waiting requests. @@ -4010,14 +4026,22 @@ func (o *consumer) removeFromRedeliverQueue(seq uint64) bool { // Checks the pending messages. func (o *consumer) checkPending() { - o.mu.Lock() - defer o.mu.Unlock() - + o.mu.RLock() mset := o.mset // On stop, mset and timer will be nil. if mset == nil || o.ptmr == nil { + o.mu.RUnlock() return } + o.mu.RUnlock() + + var shouldUpdateState bool + var state StreamState + mset.store.FastState(&state) + fseq := state.FirstSeq + + o.mu.Lock() + defer o.mu.Unlock() now := time.Now().UnixNano() ttl := int64(o.cfg.AckWait) @@ -4028,11 +4052,6 @@ func (o *consumer) checkPending() { next = int64(o.cfg.BackOff[l-1]) } - var shouldUpdateState bool - var state StreamState - mset.store.FastState(&state) - fseq := state.FirstSeq - // Since we can update timestamps, we have to review all pending. // We will now bail if we see an ack pending in bound to us via o.awl. var expired []uint64 @@ -4099,7 +4118,12 @@ func (o *consumer) checkPending() { } if len(o.pending) > 0 { - o.ptmr.Reset(o.ackWait(time.Duration(next))) + delay := time.Duration(next) + if o.ptmr == nil { + o.ptmr = time.AfterFunc(delay, o.checkPending) + } else { + o.ptmr.Reset(o.ackWait(delay)) + } } else { // Make sure to stop timer and clear out any re delivery queues stopAndClearTimer(&o.ptmr) @@ -4109,12 +4133,9 @@ func (o *consumer) checkPending() { // Update our state if needed. if shouldUpdateState { - if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil { + if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil && !o.closed { s, acc, mset, name := o.srv, o.acc, o.mset, o.name - // Can not hold lock while gather information about account and stream below. - o.mu.Unlock() - s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.name(), name, err) - o.mu.Lock() + s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.cfg.Name, name, err) } } } diff --git a/server/filestore.go b/server/filestore.go index 00fa70ab..7e1725b4 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -270,9 +270,9 @@ const ( // Check for bad record length value due to corrupt data. rlBadThresh = 32 * 1024 * 1024 // Time threshold to write index info. - wiThresh = int64(2 * time.Second) + wiThresh = int64(30 * time.Second) // Time threshold to write index info for non FIFO cases - winfThresh = int64(500 * time.Millisecond) + winfThresh = int64(2 * time.Second) ) func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) { @@ -1192,7 +1192,7 @@ func (fs *fileStore) recoverMsgs() error { if mb.msgs == 0 && mb.rbytes == 0 { if mb == fs.lmb { mb.first.seq, mb.first.ts = mb.last.seq+1, 0 - mb.closeAndKeepIndex() + mb.closeAndKeepIndex(false) } else { emptyBlks = append(emptyBlks, mb) } @@ -1263,7 +1263,7 @@ func (fs *fileStore) expireMsgsOnRecover() { if mb == fs.lmb { // Do this part by hand since not deleting one by one. mb.first.seq, mb.first.ts = mb.last.seq+1, 0 - mb.closeAndKeepIndex() + mb.closeAndKeepIndex(false) // Clear any global subject state. fs.psim = make(map[string]*psi) return false @@ -2282,12 +2282,12 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in if fseq == 0 { fseq, _ = fs.firstSeqForSubj(subj) } - if ok, _ := fs.removeMsg(fseq, false, false); ok { + if ok, _ := fs.removeMsgViaLimits(fseq); ok { // Make sure we are below the limit. if psmc--; psmc >= mmp { for info, ok := fs.psim[subj]; ok && info.total > mmp; info, ok = fs.psim[subj] { if seq, _ := fs.firstSeqForSubj(subj); seq > 0 { - if ok, _ := fs.removeMsg(seq, false, false); !ok { + if ok, _ := fs.removeMsgViaLimits(seq); !ok { break } } else { @@ -2539,7 +2539,7 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() { m, _, err := mb.firstMatching(subj, false, seq, &sm) if err == nil { seq = m.seq + 1 - if removed, _ := fs.removeMsg(m.seq, false, false); removed { + if removed, _ := fs.removeMsgViaLimits(m.seq); removed { total-- blks[mb] = struct{}{} } @@ -2560,17 +2560,24 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() { // Lock should be held. func (fs *fileStore) deleteFirstMsg() (bool, error) { - return fs.removeMsg(fs.state.FirstSeq, false, false) + return fs.removeMsgViaLimits(fs.state.FirstSeq) +} + +// If we remove via limits that can always be recovered on a restart we +// do not force the system to update the index file. +// Lock should be held. +func (fs *fileStore) removeMsgViaLimits(seq uint64) (bool, error) { + return fs.removeMsg(seq, false, true, false) } // RemoveMsg will remove the message from this store. // Will return the number of bytes removed. func (fs *fileStore) RemoveMsg(seq uint64) (bool, error) { - return fs.removeMsg(seq, false, true) + return fs.removeMsg(seq, false, false, true) } func (fs *fileStore) EraseMsg(seq uint64) (bool, error) { - return fs.removeMsg(seq, true, true) + return fs.removeMsg(seq, true, false, true) } // Convenience function to remove per subject tracking at the filestore level. @@ -2590,7 +2597,7 @@ func (fs *fileStore) removePerSubject(subj string) { } // Remove a message, optionally rewriting the mb file. -func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error) { +func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (bool, error) { if seq == 0 { return false, ErrStoreMsgNotFound } @@ -2695,7 +2702,9 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error fifo := seq == mb.first.seq isLastBlock := mb == fs.lmb isEmpty := mb.msgs == 0 - shouldWriteIndex := !isEmpty + // If we are removing the message via limits we do not need to write the index file here. + // If viaLimits this means on a restart we will properly cleanup these messages regardless. + shouldWriteIndex := !isEmpty && !viaLimits if fifo { mb.selectNextFirst() @@ -2724,10 +2733,12 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error var firstSeqNeedsUpdate bool - // Decide how we want to clean this up. If last block we will hold into index. + // Decide how we want to clean this up. If last block and the only block left we will hold into index. if isEmpty { if isLastBlock { - mb.closeAndKeepIndex() + mb.closeAndKeepIndex(viaLimits) + // We do not need to writeIndex since just did above. + shouldWriteIndex = false } else { fs.removeMsgBlock(mb) } @@ -3441,7 +3452,9 @@ func (fs *fileStore) expireMsgs() { fs.mu.RUnlock() for sm, _ = fs.msgForSeq(0, &smv); sm != nil && sm.ts <= minAge; sm, _ = fs.msgForSeq(0, &smv) { - fs.removeMsg(sm.seq, false, true) + fs.mu.Lock() + fs.removeMsgViaLimits(sm.seq) + fs.mu.Unlock() // Recalculate in case we are expiring a bunch. minAge = time.Now().UnixNano() - maxAge } @@ -3788,7 +3801,7 @@ func (fs *fileStore) syncBlocks() { mb.ifd.Truncate(mb.liwsz) mb.ifd.Sync() } - // See if we can close FDs do to being idle. + // See if we can close FDs due to being idle. if mb.ifd != nil || mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle { mb.dirtyCloseWithRemove(false) } @@ -4241,6 +4254,7 @@ var ( errNoMsgBlk = errors.New("no message block") errMsgBlkTooBig = errors.New("message block size exceeded int capacity") errUnknownCipher = errors.New("unknown cipher") + errDIOStalled = errors.New("IO is stalled") ) // Used for marking messages that have had their checksums checked. @@ -4755,7 +4769,9 @@ func (mb *msgBlock) writeIndexInfoLocked() error { } // Check if this will be a short write, and if so truncate before writing here. - if int64(len(buf)) < mb.liwsz { + // We only really need to truncate if we are encryptyed or we have dmap entries. + // If no dmap entries readIndexInfo does the right thing in the presence of extra data left over. + if int64(len(buf)) < mb.liwsz && (mb.aek != nil || len(mb.dmap) > 0) { if err := mb.ifd.Truncate(0); err != nil { mb.werr = err return err @@ -4770,7 +4786,6 @@ func (mb *msgBlock) writeIndexInfoLocked() error { } else { mb.werr = err } - return err } @@ -5525,7 +5540,7 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) { // When we have an empty block but want to keep the index for timestamp info etc. // Lock should be held. -func (mb *msgBlock) closeAndKeepIndex() { +func (mb *msgBlock) closeAndKeepIndex(viaLimits bool) { // We will leave a 0 length blk marker. if mb.mfd != nil { mb.mfd.Truncate(0) @@ -5900,13 +5915,18 @@ func (mb *msgBlock) writePerSubjectInfo() error { b.Write(mb.lchk[:]) // Gate this for when we have a large number of blocks expiring at the same time. - <-dios - err := os.WriteFile(mb.sfn, b.Bytes(), defaultFilePerms) - dios <- struct{}{} - - // Clear write flag if no error. - if err == nil { - mb.fssNeedsWrite = false + // Since we have the lock we would rather fail here then block. + // This is an optional structure that can be rebuilt on restart. + var err error + select { + case <-dios: + if err = os.WriteFile(mb.sfn, b.Bytes(), defaultFilePerms); err == nil { + // Clear write flag if no error. + mb.fssNeedsWrite = false + } + dios <- struct{}{} + default: + err = errDIOStalled } return err @@ -6796,8 +6816,8 @@ var dios chan struct{} // Used to setup our simplistic counting semaphore using buffered channels. // golang.org's semaphore seemed a bit heavy. func init() { - // Based on Go max threads of 10k, limit ourselves to a max of 1k blocking IO calls. - const nIO = 1024 + // Limit ourselves to a max of 4 blocking IO calls. + const nIO = 4 dios = make(chan struct{}, nIO) // Fill it up to start. for i := 0; i < nIO; i++ { diff --git a/server/jetstream.go b/server/jetstream.go index 229870bf..dd511e2a 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -788,9 +788,9 @@ func (js *jetStream) setJetStreamStandAlone(isStandAlone bool) { // JetStreamEnabled reports if jetstream is enabled for this server. func (s *Server) JetStreamEnabled() bool { var js *jetStream - s.mu.Lock() + s.mu.RLock() js = s.js - s.mu.Unlock() + s.mu.RUnlock() return js.isEnabled() } @@ -857,9 +857,9 @@ func (s *Server) signalPullConsumers() { // Shutdown jetstream for this server. func (s *Server) shutdownJetStream() { - s.mu.Lock() + s.mu.RLock() js := s.js - s.mu.Unlock() + s.mu.RUnlock() if js == nil { return diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 00f935e1..2cbfc8a9 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -198,21 +198,15 @@ func (s *Server) trackedJetStreamServers() (js, total int) { func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) { s.mu.RLock() - shutdown := s.shutdown - js := s.js + shutdown, js := s.shutdown, s.js s.mu.RUnlock() if shutdown || js == nil { return nil, nil } - js.mu.RLock() - cc := js.cluster - js.mu.RUnlock() - if cc == nil { - return nil, nil - } - return js, cc + // Only set once, do not need a lock. + return js, js.cluster } func (s *Server) JetStreamIsClustered() bool { @@ -597,10 +591,8 @@ func (s *Server) enableJetStreamClustering() error { // isClustered returns if we are clustered. // Lock should not be held. func (js *jetStream) isClustered() bool { - js.mu.RLock() - isClustered := js.cluster != nil - js.mu.RUnlock() - return isClustered + // This is only ever set, no need for lock here. + return js.cluster != nil } // isClusteredNoLock returns if we are clustered, but unlike isClustered() does @@ -989,6 +981,7 @@ func (js *jetStream) monitorCluster() { isLeader bool lastSnap []byte lastSnapTime time.Time + minSnapDelta = 10 * time.Second ) // Highwayhash key for generating hashes. @@ -1028,8 +1021,8 @@ func (js *jetStream) monitorCluster() { case <-rqch: return case <-qch: - // Clean signal from shutdown routine so attempt to snapshot meta layer. - doSnapshot() + // Clean signal from shutdown routine so do best effort attempt to snapshot meta layer. + n.InstallSnapshot(js.metaSnapshot()) // Return the signal back since shutdown will be waiting. close(qch) return @@ -1039,7 +1032,6 @@ func (js *jetStream) monitorCluster() { if ce == nil { // Signals we have replayed all of our metadata. js.clearMetaRecovering() - // Process any removes that are still valid after recovery. for _, ca := range ru.removeConsumers { js.processConsumerRemoval(ca) @@ -1047,10 +1039,11 @@ func (js *jetStream) monitorCluster() { for _, sa := range ru.removeStreams { js.processStreamRemoval(sa) } - // Same with any pending updates. + // Process pending updates. for _, sa := range ru.updateStreams { js.processUpdateStreamAssignment(sa) } + // Now consumers. for _, ca := range ru.updateConsumers { js.processConsumerAssignment(ca) } @@ -1060,22 +1053,13 @@ func (js *jetStream) monitorCluster() { continue } // FIXME(dlc) - Deal with errors. - if didSnap, didRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil { + if didSnap, didStreamRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil { _, nb := n.Applied(ce.Index) - // If we processed a snapshot and are recovering remove our pending state. - if didSnap && js.isMetaRecovering() { - ru = &recoveryUpdates{ - removeStreams: make(map[string]*streamAssignment), - removeConsumers: make(map[string]*consumerAssignment), - updateStreams: make(map[string]*streamAssignment), - updateConsumers: make(map[string]*consumerAssignment), - } - } - if js.hasPeerEntries(ce.Entries) || didSnap || didRemoval { + if js.hasPeerEntries(ce.Entries) || didSnap || didStreamRemoval { // Since we received one make sure we have our own since we do not store // our meta state outside of raft. doSnapshot() - } else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 && time.Since(lastSnapTime) > 5*time.Second { + } else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 && time.Since(lastSnapTime) > minSnapDelta { doSnapshot() } } @@ -1213,7 +1197,7 @@ func (js *jetStream) metaSnapshot() []byte { return s2.EncodeBetter(nil, b) } -func (js *jetStream) applyMetaSnapshot(buf []byte) error { +func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecovering bool) error { var wsas []writeableStreamAssignment if len(buf) > 0 { jse, err := s2.Decode(nil, buf) @@ -1275,7 +1259,6 @@ func (js *jetStream) applyMetaSnapshot(buf []byte) error { for _, ca := range sa.consumers { caAdd = append(caAdd, ca) } - if osa := js.streamAssignment(sa.Client.serviceAccount(), sa.Config.Name); osa != nil { for _, ca := range osa.consumers { if sa.consumers[ca.Name] == nil { @@ -1291,13 +1274,21 @@ func (js *jetStream) applyMetaSnapshot(buf []byte) error { // Do removals first. for _, sa := range saDel { js.setStreamAssignmentRecovering(sa) - js.processStreamRemoval(sa) + + if isRecovering { + key := sa.recoveryKey() + ru.removeStreams[key] = sa + delete(ru.updateStreams, key) + } else { + js.processStreamRemoval(sa) + } } // Now do add for the streams. Also add in all consumers. for _, sa := range saAdd { js.setStreamAssignmentRecovering(sa) js.processStreamAssignment(sa) - // We can simply add the consumers. + + // We can simply process the consumers. for _, ca := range sa.consumers { js.setConsumerAssignmentRecovering(ca) js.processConsumerAssignment(ca) @@ -1308,17 +1299,35 @@ func (js *jetStream) applyMetaSnapshot(buf []byte) error { // sure to process any changes. for _, sa := range saChk { js.setStreamAssignmentRecovering(sa) - js.processUpdateStreamAssignment(sa) + if isRecovering { + key := sa.recoveryKey() + ru.updateStreams[key] = sa + delete(ru.removeStreams, key) + } else { + js.processUpdateStreamAssignment(sa) + } } // Now do the deltas for existing stream's consumers. for _, ca := range caDel { js.setConsumerAssignmentRecovering(ca) - js.processConsumerRemoval(ca) + if isRecovering { + key := ca.recoveryKey() + ru.removeConsumers[key] = ca + delete(ru.updateConsumers, key) + } else { + js.processConsumerRemoval(ca) + } } for _, ca := range caAdd { js.setConsumerAssignmentRecovering(ca) - js.processConsumerAssignment(ca) + if isRecovering { + key := ca.recoveryKey() + delete(ru.removeConsumers, key) + ru.updateConsumers[key] = ca + } else { + js.processConsumerAssignment(ca) + } } return nil @@ -1536,7 +1545,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo for _, e := range entries { if e.Type == EntrySnapshot { - js.applyMetaSnapshot(e.Data) + js.applyMetaSnapshot(e.Data, ru, isRecovering) didSnap = true } else if e.Type == EntryRemovePeer { if !isRecovering { @@ -1843,7 +1852,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps compactInterval = 2 * time.Minute compactSizeMin = 8 * 1024 * 1024 compactNumMin = 65536 - minSnapDelta = 5 * time.Second + minSnapDelta = 10 * time.Second ) // Spread these out for large numbers on server restart. @@ -3141,9 +3150,11 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme mset.setStreamAssignment(sa) if err = mset.updateWithAdvisory(sa.Config, false); err != nil { s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err) - // Process the raft group and make sure it's running if needed. - js.createRaftGroup(acc.GetName(), osa.Group, storage) - mset.setStreamAssignment(osa) + if osa != nil { + // Process the raft group and make sure it's running if needed. + js.createRaftGroup(acc.GetName(), osa.Group, storage) + mset.setStreamAssignment(osa) + } if rg.node != nil { rg.node.Delete() rg.node = nil @@ -4010,7 +4021,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { compactInterval = 2 * time.Minute compactSizeMin = 64 * 1024 // What is stored here is always small for consumers. compactNumMin = 1024 - minSnapDelta = 5 * time.Second + minSnapDelta = 10 * time.Second ) // Spread these out for large numbers on server restart. @@ -6772,7 +6783,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ name, stype, store := mset.cfg.Name, mset.cfg.Storage, mset.store s, js, jsa, st, rf, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node maxMsgSize, lseq, clfs := int(mset.cfg.MaxMsgSize), mset.lseq, mset.clfs - isLeader := mset.isLeader() + isLeader, isSealed := mset.isLeader(), mset.cfg.Sealed mset.mu.RUnlock() // This should not happen but possible now that we allow scale up, and scale down where this could trigger. @@ -6785,6 +6796,14 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ return NewJSClusterNotLeaderError() } + // Bail here if sealed. + if isSealed { + var resp = JSPubAckResponse{PubAck: &PubAck{Stream: mset.name()}, Error: NewJSStreamSealedError()} + b, _ := json.Marshal(resp) + mset.outq.sendMsg(reply, b) + return NewJSStreamSealedError() + } + // Check here pre-emptively if we have exceeded this server limits. if js.limitsExceeded(stype) { s.resourcesExeededError() @@ -7580,7 +7599,12 @@ func (js *jetStream) streamAlternates(ci *ClientInfo, stream string) []StreamAlt return alts } -func (mset *stream) handleClusterStreamInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, _ []byte) { +// Internal request for stream info, this is coming on the wire so do not block here. +func (mset *stream) handleClusterStreamInfoRequest(_ *subscription, c *client, _ *Account, subject, reply string, _ []byte) { + go mset.processClusterStreamInfoRequest(reply) +} + +func (mset *stream) processClusterStreamInfoRequest(reply string) { mset.mu.RLock() sysc, js, sa, config := mset.sysc, mset.srv.js, mset.sa, mset.cfg stype := mset.cfg.Storage @@ -7615,7 +7639,10 @@ func (mset *stream) handleClusterStreamInfoRequest(sub *subscription, c *client, sysc.sendInternalMsg(reply, _EMPTY_, nil, si) } -const defaultMaxTotalCatchupOutBytes = int64(128 * 1024 * 1024) // 128MB for now, for the total server. +// 64MB for now, for the total server. This is max we will blast out if asked to +// do so to another server for purposes of catchups. +// This number should be ok on 1Gbit interface. +const defaultMaxTotalCatchupOutBytes = int64(64 * 1024 * 1024) // Current total outstanding catchup bytes. func (s *Server) gcbTotal() int64 { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 4b6ec601..03e1e52e 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -2727,7 +2727,7 @@ func TestJetStreamClusterInterestPolicyEphemeral(t *testing.T) { name = test.name } - const msgs = 10_000 + const msgs = 5_000 done, count := make(chan bool), 0 sub, err := js.Subscribe(_EMPTY_, func(msg *nats.Msg) { @@ -2740,24 +2740,22 @@ func TestJetStreamClusterInterestPolicyEphemeral(t *testing.T) { require_NoError(t, err) // This happens only if we start publishing messages after consumer was created. - go func() { + pubDone := make(chan struct{}) + go func(subject string) { for i := 0; i < msgs; i++ { - js.PublishAsync(test.subject, []byte("DATA")) + js.Publish(subject, []byte("DATA")) } - select { - case <-js.PublishAsyncComplete(): - case <-time.After(5 * inactiveThreshold): - } - }() + close(pubDone) + }(test.subject) // Wait for inactive threshold to expire and all messages to be published and received // Bug is we clean up active consumers when we should not. time.Sleep(3 * inactiveThreshold / 2) select { - case <-done: - case <-time.After(5 * time.Second): - t.Fatalf("Did not receive done signal") + case <-pubDone: + case <-time.After(10 * time.Second): + t.Fatalf("Did not receive completion signal") } info, err := js.ConsumerInfo(test.stream, name) @@ -2809,9 +2807,9 @@ func TestJetStreamClusterWALBuildupOnNoOpPull(t *testing.T) { _, _ = sub.Fetch(1, nats.MaxWait(time.Microsecond)) } - // Needs to be at least 5 seconds, otherwise we won't hit the + // Needs to be at least 10 seconds, otherwise we won't hit the // minSnapDelta that prevents us from snapshotting too often - time.Sleep(time.Second * 6) + time.Sleep(time.Second * 11) for i := 0; i < 1024; i++ { _, _ = sub.Fetch(1, nats.MaxWait(time.Microsecond)) diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 3c06d1c6..8abbe17c 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -2082,14 +2082,9 @@ func TestJetStreamSuperClusterMovingStreamAndMoveBack(t *testing.T) { toSend := 10_000 for i := 0; i < toSend; i++ { - _, err := js.PublishAsync("TEST", []byte("HELLO WORLD")) + _, err := js.Publish("TEST", []byte("HELLO WORLD")) require_NoError(t, err) } - select { - case <-js.PublishAsyncComplete(): - case <-time.After(5 * time.Second): - t.Fatalf("Did not receive completion signal") - } _, err = js.UpdateStream(&nats.StreamConfig{ Name: "TEST", diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 4e208c0d..aff8acca 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -207,12 +207,10 @@ func TestJetStreamAddStream(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() - nc.Publish("foo", []byte("Hello World!")) - nc.Flush() - + js.Publish("foo", []byte("Hello World!")) state := mset.state() if state.Msgs != 1 { t.Fatalf("Expected 1 message, got %d", state.Msgs) @@ -221,9 +219,7 @@ func TestJetStreamAddStream(t *testing.T) { t.Fatalf("Expected non-zero bytes") } - nc.Publish("foo", []byte("Hello World Again!")) - nc.Flush() - + js.Publish("foo", []byte("Hello World Again!")) state = mset.state() if state.Msgs != 2 { t.Fatalf("Expected 2 messages, got %d", state.Msgs) @@ -4959,7 +4955,7 @@ func TestJetStreamDurableFilteredSubjectConsumerReconnect(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() sendMsgs := func(toSend int) { @@ -4970,11 +4966,9 @@ func TestJetStreamDurableFilteredSubjectConsumerReconnect(t *testing.T) { } else { subj = "foo.ZZ" } - if err := nc.Publish(subj, []byte("OK!")); err != nil { - return - } + _, err := js.Publish(subj, []byte("OK!")) + require_NoError(t, err) } - nc.Flush() } // Send 50 msgs @@ -5104,14 +5098,14 @@ func TestJetStreamConsumerInactiveNoDeadlock(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send lots of msgs and have them queued up. for i := 0; i < 10000; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() + if state := mset.state(); state.Msgs != 10000 { t.Fatalf("Expected %d messages, got %d", 10000, state.Msgs) } @@ -5263,14 +5257,13 @@ func TestJetStreamRedeliverCount(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send 10 msgs for i := 0; i < 10; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() if state := mset.state(); state.Msgs != 10 { t.Fatalf("Expected %d messages, got %d", 10, state.Msgs) } @@ -5412,14 +5405,13 @@ func TestJetStreamCanNotNakAckd(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send 10 msgs for i := 0; i < 10; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() if state := mset.state(); state.Msgs != 10 { t.Fatalf("Expected %d messages, got %d", 10, state.Msgs) } @@ -5486,14 +5478,13 @@ func TestJetStreamStreamPurge(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send 100 msgs for i := 0; i < 100; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() if state := mset.state(); state.Msgs != 100 { t.Fatalf("Expected %d messages, got %d", 100, state.Msgs) } @@ -5508,8 +5499,7 @@ func TestJetStreamStreamPurge(t *testing.T) { } time.Sleep(10 * time.Millisecond) now := time.Now() - nc.Publish("DC", []byte("OK!")) - nc.Flush() + js.Publish("DC", []byte("OK!")) state = mset.state() if state.Msgs != 1 { @@ -5544,14 +5534,13 @@ func TestJetStreamStreamPurgeWithConsumer(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send 100 msgs for i := 0; i < 100; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() if state := mset.state(); state.Msgs != 100 { t.Fatalf("Expected %d messages, got %d", 100, state.Msgs) } @@ -5606,7 +5595,7 @@ func TestJetStreamStreamPurgeWithConsumer(t *testing.T) { t.Fatalf("Expected ackfloor for obsseq to be 75, got %d", state.AckFloor.Consumer) } // Also make sure we can get new messages correctly. - nc.Request("DC", []byte("OK-22"), time.Second) + js.Publish("DC", []byte("OK-22")) if msg, err := nc.Request(nextSubj, nil, time.Second); err != nil { t.Fatalf("Unexpected error: %v", err) } else if string(msg.Data) != "OK-22" { @@ -5635,14 +5624,13 @@ func TestJetStreamStreamPurgeWithConsumerAndRedelivery(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send 100 msgs for i := 0; i < 100; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() if state := mset.state(); state.Msgs != 100 { t.Fatalf("Expected %d messages, got %d", 100, state.Msgs) } @@ -5690,7 +5678,7 @@ func TestJetStreamStreamPurgeWithConsumerAndRedelivery(t *testing.T) { t.Fatalf("Expected ackfloor for obsseq to be 75, got %d", state.AckFloor.Consumer) } // Also make sure we can get new messages correctly. - nc.Request("DC", []byte("OK-22"), time.Second) + js.Publish("DC", []byte("OK-22")) if msg, err := nc.Request(nextSubj, nil, time.Second); err != nil { t.Fatalf("Unexpected error: %v", err) } else if string(msg.Data) != "OK-22" { @@ -5719,16 +5707,15 @@ func TestJetStreamInterestRetentionStream(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send 100 msgs totalMsgs := 100 for i := 0; i < totalMsgs; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() checkNumMsgs := func(numExpected int) { t.Helper() @@ -5764,9 +5751,8 @@ func TestJetStreamInterestRetentionStream(t *testing.T) { mset.addConsumer(&ConsumerConfig{DeliverSubject: sub3.Subject, AckPolicy: AckNone}) for i := 0; i < totalMsgs; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() checkNumMsgs(totalMsgs) @@ -8264,15 +8250,14 @@ func TestJetStreamDeleteMsg(t *testing.T) { t.Fatalf("Unexpected error adding stream: %v", err) } - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() pubTen := func() { t.Helper() for i := 0; i < 10; i++ { - nc.Publish("foo", []byte("Hello World!")) + js.Publish("foo", []byte("Hello World!")) } - nc.Flush() } pubTen() @@ -11786,9 +11771,9 @@ func TestJetStreamServerEncryption(t *testing.T) { } for _, c := range cases { - t.Run(c.name, func(t *testing.T) { tmpl := ` + server_name: S22 listen: 127.0.0.1:-1 jetstream: {key: $JS_KEY, store_dir: '%s' %s} ` @@ -18009,6 +17994,7 @@ func TestJetStreamProperErrorDueToOverlapSubjects(t *testing.T) { func TestJetStreamServerCipherConvert(t *testing.T) { tmpl := ` + server_name: S22 listen: 127.0.0.1:-1 jetstream: {key: s3cr3t, store_dir: '%s', cipher: %s} ` diff --git a/server/jwt_test.go b/server/jwt_test.go index 5df09621..0ffe9044 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -6132,6 +6132,61 @@ func TestJWTAccountProtectedImport(t *testing.T) { }) } +// Headers are ignored in claims update, but passing them should not cause error. +func TestJWTClaimsUpdateWithHeaders(t *testing.T) { + skp, spub := createKey(t) + newUser(t, skp) + + sclaim := jwt.NewAccountClaims(spub) + encodeClaim(t, sclaim, spub) + + akp, apub := createKey(t) + newUser(t, akp) + claim := jwt.NewAccountClaims(apub) + jwtClaim := encodeClaim(t, claim, apub) + + dirSrv := t.TempDir() + + conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + operator: %s + system_account: %s + resolver: { + type: full + dir: '%s' + } + `, ojwt, spub, dirSrv))) + + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + type zapi struct { + Server *ServerInfo + Data *Connz + Error *ApiError + } + + sc := natsConnect(t, s.ClientURL(), createUserCreds(t, s, skp)) + defer sc.Close() + // Pass claims update with headers. + msg := &nats.Msg{ + Subject: "$SYS.REQ.CLAIMS.UPDATE", + Data: []byte(jwtClaim), + Header: map[string][]string{"key": {"value"}}, + } + resp, err := sc.RequestMsg(msg, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var cz zapi + if err := json.Unmarshal(resp.Data, &cz); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if cz.Error != nil { + t.Fatalf("Unexpected error: %+v", cz.Error) + } +} + func TestJWTMappings(t *testing.T) { sysKp, syspub := createKey(t) sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) diff --git a/server/leafnode.go b/server/leafnode.go index 9a5b7639..9594d74f 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -888,7 +888,13 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf } now := time.Now().UTC() - c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now} + var clientOpts = ClientOpts{ + Name: opts.ServerName, + Verbose: defaultOpts.Verbose, + Pedantic: defaultOpts.Pedantic, + Echo: defaultOpts.Echo, + } + c := &client{srv: s, nc: conn, kind: LEAF, opts: clientOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now} // Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs c.leaf = &leaf{} diff --git a/server/monitor.go b/server/monitor.go index 49e3a1b8..b0b5a9d7 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2096,6 +2096,7 @@ type LeafzOptions struct { // LeafInfo has detailed information on each remote leafnode connection. type LeafInfo struct { + Name string `json:"name"` Account string `json:"account"` IP string `json:"ip"` Port int `json:"port"` @@ -2135,6 +2136,7 @@ func (s *Server) Leafz(opts *LeafzOptions) (*Leafz, error) { for _, ln := range lconns { ln.mu.Lock() lni := &LeafInfo{ + Name: ln.opts.Name, Account: ln.acc.Name, IP: ln.host, Port: int(ln.port), diff --git a/server/monitor_test.go b/server/monitor_test.go index 78d7ef2f..b8b08e0f 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -3707,11 +3707,13 @@ func TestMonitorLeafz(t *testing.T) { } acc1, mycreds1 := createAcc(t) acc2, mycreds2 := createAcc(t) + leafName := "my-leaf-node" content = ` port: -1 http: "127.0.0.1:-1" ping_interval = 1 + server_name: %s accounts { %s { users [ @@ -3740,6 +3742,7 @@ func TestMonitorLeafz(t *testing.T) { } ` config := fmt.Sprintf(content, + leafName, acc1.Name, acc2.Name, acc1.Name, ob.LeafNode.Port, mycreds1, acc2.Name, ob.LeafNode.Port, mycreds2) @@ -3814,6 +3817,9 @@ func TestMonitorLeafz(t *testing.T) { } else { t.Fatalf("Expected account to be %q or %q, got %q", acc1.Name, acc2.Name, ln.Account) } + if ln.Name != leafName { + t.Fatalf("Expected name to be %q, got %q", leafName, ln.Name) + } if ln.RTT == "" { t.Fatalf("RTT not tracked?") } @@ -3902,6 +3908,9 @@ func TestMonitorLeafz(t *testing.T) { } else { t.Fatalf("Expected account to be %q or %q, got %q", acc1.Name, acc2.Name, ln.Account) } + if ln.Name != leafName { + t.Fatalf("Expected name to be %q, got %q", leafName, ln.Name) + } if ln.RTT == "" { t.Fatalf("RTT not tracked?") } diff --git a/server/norace_test.go b/server/norace_test.go index 8bbb719b..074bd45e 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -3444,7 +3444,7 @@ func TestNoRaceJetStreamClusterMemoryStreamConsumerRaftGrowth(t *testing.T) { t.Fatalf("Error looking up consumer %q", "q1") } node := o.raftNode().(*raft) - checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { if ms := node.wal.(*memStore); ms.State().Msgs > 8192 { return fmt.Errorf("Did not compact the raft memory WAL") } diff --git a/server/raft.go b/server/raft.go index 5b05798a..b98db197 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2733,7 +2733,11 @@ func (n *raft) createCatchup(ae *appendEntry) string { // Truncate our WAL and reset. // Lock should be held. func (n *raft) truncateWAL(term, index uint64) { - n.debug("Truncating and repairing WAL") + n.debug("Truncating and repairing WAL to Term %d Index %d", term, index) + + if term == 0 && index == 0 { + n.warn("Resetting WAL state") + } defer func() { // Check to see if we invalidated any snapshots that might have held state @@ -2762,6 +2766,11 @@ func (n *raft) truncateWAL(term, index uint64) { } +// Reset our WAL. +func (n *raft) resetWAL() { + n.truncateWAL(0, 0) +} + // Lock should be held func (n *raft) updateLeader(newLeader string) { n.leader = newLeader @@ -3097,7 +3106,9 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) { n.term = ar.term n.vote = noVote n.writeTermVote() + n.warn("Detected another leader with higher term, will stepdown and reset") n.stepdown.push(noLeader) + n.resetWAL() } else if ar.reply != _EMPTY_ { n.catchupFollower(ar) } @@ -3109,14 +3120,6 @@ func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, _ *Accoun ar := n.decodeAppendEntryResponse(msg) ar.reply = reply n.resp.push(ar) - if ar.success { - n.Lock() - // Update peer's last index. - if ps := n.peers[ar.peer]; ps != nil && ar.index > ps.li { - ps.li = ar.index - } - n.Unlock() - } } func (n *raft) buildAppendEntry(entries []*Entry) *appendEntry { @@ -3371,6 +3374,10 @@ func (n *raft) readTermVote() (term uint64, voted string, err error) { // Lock should be held. func (n *raft) setWriteErrLocked(err error) { + // Check if we are closed already. + if n.state == Closed { + return + } // Ignore if already set. if n.werr == err || err == nil { return @@ -3409,6 +3416,12 @@ func (n *raft) fileWriter() { psf := filepath.Join(n.sd, peerStateFile) n.RUnlock() + isClosed := func() bool { + n.RLock() + defer n.RUnlock() + return n.state == Closed + } + for s.isRunning() { select { case <-n.quit: @@ -3421,7 +3434,7 @@ func (n *raft) fileWriter() { <-dios err := os.WriteFile(tvf, buf[:], 0640) dios <- struct{}{} - if err != nil { + if err != nil && !isClosed() { n.setWriteErr(err) n.warn("Error writing term and vote file for %q: %v", n.group, err) } @@ -3432,7 +3445,7 @@ func (n *raft) fileWriter() { <-dios err := os.WriteFile(psf, buf, 0640) dios <- struct{}{} - if err != nil { + if err != nil && !isClosed() { n.setWriteErr(err) n.warn("Error writing peer state file for %q: %v", n.group, err) } diff --git a/server/stream.go b/server/stream.go index c8e59d37..7de552b6 100644 --- a/server/stream.go +++ b/server/stream.go @@ -506,8 +506,6 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt // Assign our transform for republishing. mset.tr = tr } - - jsa.streams[cfg.Name] = mset storeDir := filepath.Join(jsa.storeDir, streamsDir, cfg.Name) jsa.mu.Unlock() @@ -594,6 +592,11 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt } } + // Register with our account last. + jsa.mu.Lock() + jsa.streams[cfg.Name] = mset + jsa.mu.Unlock() + return mset, nil } @@ -3627,39 +3630,9 @@ func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) { // processInboundJetStreamMsg handles processing messages bound for a stream. func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { - mset.mu.RLock() - isLeader, isClustered, isSealed := mset.isLeader(), mset.isClustered(), mset.cfg.Sealed - mset.mu.RUnlock() - - // If we are not the leader just ignore. - if !isLeader { - return - } - - if isSealed { - var resp = JSPubAckResponse{ - PubAck: &PubAck{Stream: mset.name()}, - Error: NewJSStreamSealedError(), - } - b, _ := json.Marshal(resp) - mset.outq.sendMsg(reply, b) - return - } - + // Always move this to another Go routine. hdr, msg := c.msgParts(rmsg) - - // If we are not receiving directly from a client we should move this to another Go routine. - if c.kind != CLIENT { - mset.queueInboundMsg(subject, reply, hdr, msg) - return - } - - // If we are clustered we need to propose this message to the underlying raft group. - if isClustered { - mset.processClusteredInboundMsg(subject, reply, hdr, msg) - } else { - mset.processJetStreamMsg(subject, reply, hdr, msg, 0, 0) - } + mset.queueInboundMsg(subject, reply, hdr, msg) } var ( @@ -3696,11 +3669,24 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, numConsumers := len(mset.consumers) interestRetention := mset.cfg.Retention == InterestPolicy // Snapshot if we are the leader and if we can respond. - isLeader := mset.isLeader() + isLeader, isSealed := mset.isLeader(), mset.cfg.Sealed canRespond := doAck && len(reply) > 0 && isLeader var resp = &JSPubAckResponse{} + // Bail here if sealed. + if isSealed { + outq := mset.outq + mset.mu.Unlock() + if canRespond && outq != nil { + resp.PubAck = &PubAck{Stream: name} + resp.Error = ApiErrors[JSStreamSealedErr] + b, _ := json.Marshal(resp) + outq.sendMsg(reply, b) + } + return ApiErrors[JSStreamSealedErr] + } + var buf [256]byte pubAck := append(buf[:0], mset.pubAck...) @@ -4161,13 +4147,13 @@ func (mset *stream) signalConsumersLoop() { // This will update and signal all consumers that match. func (mset *stream) signalConsumers(subj string, seq uint64) { mset.clsMu.RLock() - defer mset.clsMu.RUnlock() - if mset.csl == nil { + mset.clsMu.RUnlock() return } - r := mset.csl.Match(subj) + mset.clsMu.RUnlock() + if len(r.psubs) == 0 { return } @@ -4759,19 +4745,20 @@ func (mset *stream) state() StreamState { } func (mset *stream) stateWithDetail(details bool) StreamState { - mset.mu.RLock() - c, store := mset.client, mset.store - mset.mu.RUnlock() - if c == nil || store == nil { + // mset.store does not change once set, so ok to reference here directly. + // We do this elsewhere as well. + store := mset.store + if store == nil { return StreamState{} } - // Currently rely on store. + + // Currently rely on store for details. if details { return store.State() } // Here we do the fast version. var state StreamState - mset.store.FastState(&state) + store.FastState(&state) return state }