From 74b703549df6d6da77a9c445fe880612ff443a30 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Mon, 27 Feb 2023 05:42:11 -0800 Subject: [PATCH 01/26] Add raft query parameter to /jsz to include raft group info Signed-off-by: Waldemar Quevedo --- server/monitor.go | 56 +++++++++++++++++++++++++++++++----------- server/monitor_test.go | 28 +++++++++++++++++++++ 2 files changed, 70 insertions(+), 14 deletions(-) diff --git a/server/monitor.go b/server/monitor.go index 5cda71c2..b19ffb2c 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2636,6 +2636,7 @@ type JSzOptions struct { LeaderOnly bool `json:"leader_only,omitempty"` Offset int `json:"offset,omitempty"` Limit int `json:"limit,omitempty"` + RaftGroups bool `json:"raft,omitempty"` } // HealthzOptions are options passed to Healthz @@ -2646,15 +2647,24 @@ type HealthzOptions struct { JSServerOnly bool `json:"js-server-only,omitempty"` } +// StreamDetail shows information about the stream state and its consumers. type StreamDetail struct { - Name string `json:"name"` - Created time.Time `json:"created"` - Cluster *ClusterInfo `json:"cluster,omitempty"` - Config *StreamConfig `json:"config,omitempty"` - State StreamState `json:"state,omitempty"` - Consumer []*ConsumerInfo `json:"consumer_detail,omitempty"` - Mirror *StreamSourceInfo `json:"mirror,omitempty"` - Sources []*StreamSourceInfo `json:"sources,omitempty"` + Name string `json:"name"` + Created time.Time `json:"created"` + Cluster *ClusterInfo `json:"cluster,omitempty"` + Config *StreamConfig `json:"config,omitempty"` + State StreamState `json:"state,omitempty"` + Consumer []*ConsumerInfo `json:"consumer_detail,omitempty"` + Mirror *StreamSourceInfo `json:"mirror,omitempty"` + Sources []*StreamSourceInfo `json:"sources,omitempty"` + RaftGroup string `json:"stream_raft_group,omitempty"` + ConsumerRaftGroups []*RaftGroupDetail `json:"consumer_raft_groups,omitempty"` +} + +// RaftGroupDetail shows information details about the Raft group. +type RaftGroupDetail struct { + Name string `json:"name"` + RaftGroup string `json:"raft_group,omitempty"` } type AccountDetail struct { @@ -2690,7 +2700,7 @@ type JSInfo struct { AccountDetails []*AccountDetail `json:"account_details,omitempty"` } -func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg bool) *AccountDetail { +func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, optRaft bool) *AccountDetail { jsa.mu.RLock() acc := jsa.account name := acc.GetName() @@ -2729,7 +2739,8 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg if optStreams { for _, stream := range streams { - ci := s.js.clusterInfo(stream.raftGroup()) + rgroup := stream.raftGroup() + ci := s.js.clusterInfo(rgroup) var cfg *StreamConfig if optCfg { c := stream.config() @@ -2744,17 +2755,28 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg Mirror: stream.mirrorInfo(), Sources: stream.sourcesInfo(), } + if optRaft && rgroup != nil { + sdet.RaftGroup = rgroup.Name + sdet.ConsumerRaftGroups = make([]*RaftGroupDetail, 0) + } if optConsumers { for _, consumer := range stream.getPublicConsumers() { cInfo := consumer.info() if cInfo == nil { continue } - if !optCfg { cInfo.Config = nil } sdet.Consumer = append(sdet.Consumer, cInfo) + if optRaft { + crgroup := consumer.raftGroup() + if crgroup != nil { + sdet.ConsumerRaftGroups = append(sdet.ConsumerRaftGroups, + &RaftGroupDetail{cInfo.Name, crgroup.Name}, + ) + } + } } } detail.Streams = append(detail.Streams, sdet) @@ -2778,7 +2800,7 @@ func (s *Server) JszAccount(opts *JSzOptions) (*AccountDetail, error) { if !ok { return nil, fmt.Errorf("account %q not jetstream enabled", acc) } - return s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config), nil + return s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups), nil } // helper to get cluster info from node via dummy group @@ -2905,7 +2927,7 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) { } // if wanted, obtain accounts/streams/consumer for _, jsa := range accounts { - detail := s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config) + detail := s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups) jsi.AccountDetails = append(jsi.AccountDetails, detail) } return jsi, nil @@ -2944,6 +2966,10 @@ func (s *Server) HandleJsz(w http.ResponseWriter, r *http.Request) { if err != nil { return } + rgroups, err := decodeBool(w, r, "raft") + if err != nil { + return + } l, err := s.Jsz(&JSzOptions{ r.URL.Query().Get("acc"), @@ -2953,7 +2979,9 @@ func (s *Server) HandleJsz(w http.ResponseWriter, r *http.Request) { config, leader, offset, - limit}) + limit, + rgroups, + }) if err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(err.Error())) diff --git a/server/monitor_test.go b/server/monitor_test.go index 6f00e18f..67fedb5c 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -4308,6 +4308,9 @@ func TestMonitorJsz(t *testing.T) { if info.AccountDetails[0].Streams[0].Consumer[0].Config != nil { t.Fatal("Config expected to not be present") } + if len(info.AccountDetails[0].Streams[0].ConsumerRaftGroups) != 0 { + t.Fatalf("expected consumer raft groups to not be returned by %s but got %v", url, info) + } } }) t.Run("config", func(t *testing.T) { @@ -4383,6 +4386,31 @@ func TestMonitorJsz(t *testing.T) { } } }) + t.Run("raftgroups", func(t *testing.T) { + for _, url := range []string{monUrl1, monUrl2} { + info := readJsInfo(url + "?acc=ACC&consumers=true&raft=true") + if len(info.AccountDetails) != 1 { + t.Fatalf("expected account ACC to be returned by %s but got %v", url, info) + } + if len(info.AccountDetails[0].Streams[0].Consumer) == 0 { + t.Fatalf("expected consumers to be returned by %s but got %v", url, info) + } + if len(info.AccountDetails[0].Streams[0].ConsumerRaftGroups) == 0 { + t.Fatalf("expected consumer raft groups to be returned by %s but got %v", url, info) + } + srgroup := info.AccountDetails[0].Streams[0].RaftGroup + if len(srgroup) == 0 { + t.Fatal("expected stream raft group info to be included") + } + crgroup := info.AccountDetails[0].Streams[0].ConsumerRaftGroups[0] + if crgroup.Name != "my-consumer-replicated" { + t.Fatalf("expected consumer name to be included in raft group info, got: %v", crgroup.Name) + } + if len(crgroup.RaftGroup) == 0 { + t.Fatal("expected consumer raft group info to be included") + } + } + }) } func TestMonitorReloadTLSConfig(t *testing.T) { From 43916290df55b3a229c9b0ea4d89e6641e5f3b0c Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 27 Feb 2023 11:20:37 -0800 Subject: [PATCH 02/26] Make minimum snapshot time for all assets 10s. Do not lock on clustered test for JetStream, not needed. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 00f935e1..46ff4f99 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -597,10 +597,8 @@ func (s *Server) enableJetStreamClustering() error { // isClustered returns if we are clustered. // Lock should not be held. func (js *jetStream) isClustered() bool { - js.mu.RLock() - isClustered := js.cluster != nil - js.mu.RUnlock() - return isClustered + // This is only ever set, no need for lock here. + return js.cluster != nil } // isClusteredNoLock returns if we are clustered, but unlike isClustered() does @@ -989,6 +987,7 @@ func (js *jetStream) monitorCluster() { isLeader bool lastSnap []byte lastSnapTime time.Time + minSnapDelta = 10 * time.Second ) // Highwayhash key for generating hashes. @@ -1001,7 +1000,7 @@ func (js *jetStream) monitorCluster() { // Snapshotting function. doSnapshot := func() { // Suppress during recovery. - if js.isMetaRecovering() { + if js.isMetaRecovering() || time.Since(lastSnapTime) < minSnapDelta { return } snap := js.metaSnapshot() @@ -1275,7 +1274,6 @@ func (js *jetStream) applyMetaSnapshot(buf []byte) error { for _, ca := range sa.consumers { caAdd = append(caAdd, ca) } - if osa := js.streamAssignment(sa.Client.serviceAccount(), sa.Config.Name); osa != nil { for _, ca := range osa.consumers { if sa.consumers[ca.Name] == nil { @@ -1843,7 +1841,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps compactInterval = 2 * time.Minute compactSizeMin = 8 * 1024 * 1024 compactNumMin = 65536 - minSnapDelta = 5 * time.Second + minSnapDelta = 10 * time.Second ) // Spread these out for large numbers on server restart. @@ -4010,7 +4008,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { compactInterval = 2 * time.Minute compactSizeMin = 64 * 1024 // What is stored here is always small for consumers. compactNumMin = 1024 - minSnapDelta = 5 * time.Second + minSnapDelta = 10 * time.Second ) // Spread these out for large numbers on server restart. From 2711460b7b649ac02b78d4c49292e184b10e3cd9 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 27 Feb 2023 11:21:33 -0800 Subject: [PATCH 03/26] Prevent benign spin between competing leaders with same index but differen term. Remove lock from route processing for updating peers progress, altready handled in trackPeer. Signed-off-by: Derek Collison --- server/raft.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/server/raft.go b/server/raft.go index 5b05798a..7dabfc64 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2733,7 +2733,11 @@ func (n *raft) createCatchup(ae *appendEntry) string { // Truncate our WAL and reset. // Lock should be held. func (n *raft) truncateWAL(term, index uint64) { - n.debug("Truncating and repairing WAL") + n.debug("Truncating and repairing WAL to Term %d Index %d", term, index) + + if term == 0 && index == 0 { + n.warn("Resetting WAL state") + } defer func() { // Check to see if we invalidated any snapshots that might have held state @@ -2762,6 +2766,11 @@ func (n *raft) truncateWAL(term, index uint64) { } +// Reset our WAL. +func (n *raft) resetWAL() { + n.truncateWAL(0, 0) +} + // Lock should be held func (n *raft) updateLeader(newLeader string) { n.leader = newLeader @@ -3097,7 +3106,9 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) { n.term = ar.term n.vote = noVote n.writeTermVote() + n.warn("Detected another leader with higher term, will stepdown and reset") n.stepdown.push(noLeader) + n.resetWAL() } else if ar.reply != _EMPTY_ { n.catchupFollower(ar) } @@ -3109,14 +3120,6 @@ func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, _ *Accoun ar := n.decodeAppendEntryResponse(msg) ar.reply = reply n.resp.push(ar) - if ar.success { - n.Lock() - // Update peer's last index. - if ps := n.peers[ar.peer]; ps != nil && ar.index > ps.li { - ps.li = ar.index - } - n.Unlock() - } } func (n *raft) buildAppendEntry(entries []*Entry) *appendEntry { From 3cebd26ef9f8169094e4fc089d2d73a2dac74963 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 27 Feb 2023 11:22:45 -0800 Subject: [PATCH 04/26] Optimize for high IO workloads. When we know optional metadata will always be correct on restart do not require inline IO all the time. Signed-off-by: Derek Collison --- server/filestore.go | 103 +++++++++++++++++++++++++++++--------------- 1 file changed, 69 insertions(+), 34 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 00fa70ab..859e15ed 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -270,9 +270,9 @@ const ( // Check for bad record length value due to corrupt data. rlBadThresh = 32 * 1024 * 1024 // Time threshold to write index info. - wiThresh = int64(2 * time.Second) + wiThresh = int64(30 * time.Second) // Time threshold to write index info for non FIFO cases - winfThresh = int64(500 * time.Millisecond) + winfThresh = int64(2 * time.Second) ) func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) { @@ -1192,7 +1192,7 @@ func (fs *fileStore) recoverMsgs() error { if mb.msgs == 0 && mb.rbytes == 0 { if mb == fs.lmb { mb.first.seq, mb.first.ts = mb.last.seq+1, 0 - mb.closeAndKeepIndex() + mb.closeAndKeepIndex(false) } else { emptyBlks = append(emptyBlks, mb) } @@ -1263,7 +1263,7 @@ func (fs *fileStore) expireMsgsOnRecover() { if mb == fs.lmb { // Do this part by hand since not deleting one by one. mb.first.seq, mb.first.ts = mb.last.seq+1, 0 - mb.closeAndKeepIndex() + mb.closeAndKeepIndex(false) // Clear any global subject state. fs.psim = make(map[string]*psi) return false @@ -2282,12 +2282,12 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in if fseq == 0 { fseq, _ = fs.firstSeqForSubj(subj) } - if ok, _ := fs.removeMsg(fseq, false, false); ok { + if ok, _ := fs.removeMsgViaLimits(fseq); ok { // Make sure we are below the limit. if psmc--; psmc >= mmp { for info, ok := fs.psim[subj]; ok && info.total > mmp; info, ok = fs.psim[subj] { if seq, _ := fs.firstSeqForSubj(subj); seq > 0 { - if ok, _ := fs.removeMsg(seq, false, false); !ok { + if ok, _ := fs.removeMsgViaLimits(seq); !ok { break } } else { @@ -2539,7 +2539,7 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() { m, _, err := mb.firstMatching(subj, false, seq, &sm) if err == nil { seq = m.seq + 1 - if removed, _ := fs.removeMsg(m.seq, false, false); removed { + if removed, _ := fs.removeMsgViaLimits(m.seq); removed { total-- blks[mb] = struct{}{} } @@ -2560,17 +2560,24 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() { // Lock should be held. func (fs *fileStore) deleteFirstMsg() (bool, error) { - return fs.removeMsg(fs.state.FirstSeq, false, false) + return fs.removeMsgViaLimits(fs.state.FirstSeq) +} + +// If we remove via limits that can always be recovered on a restart we +// do not force the system to update the index file. +// Lock should be held. +func (fs *fileStore) removeMsgViaLimits(seq uint64) (bool, error) { + return fs.removeMsg(seq, false, true, false) } // RemoveMsg will remove the message from this store. // Will return the number of bytes removed. func (fs *fileStore) RemoveMsg(seq uint64) (bool, error) { - return fs.removeMsg(seq, false, true) + return fs.removeMsg(seq, false, false, true) } func (fs *fileStore) EraseMsg(seq uint64) (bool, error) { - return fs.removeMsg(seq, true, true) + return fs.removeMsg(seq, true, false, true) } // Convenience function to remove per subject tracking at the filestore level. @@ -2590,7 +2597,7 @@ func (fs *fileStore) removePerSubject(subj string) { } // Remove a message, optionally rewriting the mb file. -func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error) { +func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (bool, error) { if seq == 0 { return false, ErrStoreMsgNotFound } @@ -2695,7 +2702,9 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error fifo := seq == mb.first.seq isLastBlock := mb == fs.lmb isEmpty := mb.msgs == 0 - shouldWriteIndex := !isEmpty + // If we are removing the message via limits we do not need to write the index file here. + // If viaLimits this means ona restart we will properly cleanup these messages regardless. + shouldWriteIndex := !isEmpty && !viaLimits if fifo { mb.selectNextFirst() @@ -2724,10 +2733,12 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error var firstSeqNeedsUpdate bool - // Decide how we want to clean this up. If last block we will hold into index. + // Decide how we want to clean this up. If last block and the only block left we will hold into index. if isEmpty { if isLastBlock { - mb.closeAndKeepIndex() + mb.closeAndKeepIndex(viaLimits) + // We do not need to writeIndex since just did above. + shouldWriteIndex = false } else { fs.removeMsgBlock(mb) } @@ -3441,7 +3452,9 @@ func (fs *fileStore) expireMsgs() { fs.mu.RUnlock() for sm, _ = fs.msgForSeq(0, &smv); sm != nil && sm.ts <= minAge; sm, _ = fs.msgForSeq(0, &smv) { - fs.removeMsg(sm.seq, false, true) + fs.mu.Lock() + fs.removeMsgViaLimits(sm.seq) + fs.mu.Unlock() // Recalculate in case we are expiring a bunch. minAge = time.Now().UnixNano() - maxAge } @@ -3788,7 +3801,7 @@ func (fs *fileStore) syncBlocks() { mb.ifd.Truncate(mb.liwsz) mb.ifd.Sync() } - // See if we can close FDs do to being idle. + // See if we can close FDs due to being idle. if mb.ifd != nil || mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle { mb.dirtyCloseWithRemove(false) } @@ -4241,6 +4254,7 @@ var ( errNoMsgBlk = errors.New("no message block") errMsgBlkTooBig = errors.New("message block size exceeded int capacity") errUnknownCipher = errors.New("unknown cipher") + errDIOStalled = errors.New("IO is stalled") ) // Used for marking messages that have had their checksums checked. @@ -4755,7 +4769,9 @@ func (mb *msgBlock) writeIndexInfoLocked() error { } // Check if this will be a short write, and if so truncate before writing here. - if int64(len(buf)) < mb.liwsz { + // We only really need to truncate if we are encryptyed or we have dmap entries. + // If no dmap entries readIndexInfo does the right thing in the presence of extra data left over. + if int64(len(buf)) < mb.liwsz && (mb.aek != nil || len(mb.dmap) > 0) { if err := mb.ifd.Truncate(0); err != nil { mb.werr = err return err @@ -4770,7 +4786,6 @@ func (mb *msgBlock) writeIndexInfoLocked() error { } else { mb.werr = err } - return err } @@ -5525,22 +5540,37 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) { // When we have an empty block but want to keep the index for timestamp info etc. // Lock should be held. -func (mb *msgBlock) closeAndKeepIndex() { - // We will leave a 0 length blk marker. - if mb.mfd != nil { - mb.mfd.Truncate(0) - } else { - // We were closed, so just write out an empty file. +func (mb *msgBlock) closeAndKeepIndex(viaLimits bool) { + // We will leave a 0 length blk marker so we can remember. + doIO := func() { + // Write out empty file. os.WriteFile(mb.mfn, nil, defaultFilePerms) + // Make sure to write the index file so we can remember last seq and ts. + mb.writeIndexInfoLocked() + mb.removePerSubjectInfoLocked() + } + + if mb.mfd != nil { + mb.mfd.Close() + mb.mfd = nil + // We used to truncate here but can be quite expensive based on platform. } - // Make sure to write the index file so we can remember last seq and ts. - mb.writeIndexInfoLocked() // Close mb.dirtyCloseWithRemove(false) // Make sure to remove fss state. mb.fss = nil - mb.removePerSubjectInfoLocked() + + // If via limits we can do this out of line in separate Go routine. + if viaLimits { + go func() { + mb.mu.Lock() + defer mb.mu.Unlock() + doIO() + }() + } else { + doIO() + } // If we are encrypted we should reset our bek counter. if mb.bek != nil { @@ -5900,13 +5930,18 @@ func (mb *msgBlock) writePerSubjectInfo() error { b.Write(mb.lchk[:]) // Gate this for when we have a large number of blocks expiring at the same time. - <-dios - err := os.WriteFile(mb.sfn, b.Bytes(), defaultFilePerms) - dios <- struct{}{} - - // Clear write flag if no error. - if err == nil { - mb.fssNeedsWrite = false + // Since we have the lock we would rather fail here then block. + // This is an optional structure that can be rebuilt on restart. + var err error + select { + case <-dios: + if err = os.WriteFile(mb.sfn, b.Bytes(), defaultFilePerms); err == nil { + // Clear write flag if no error. + mb.fssNeedsWrite = false + } + <-dios + default: + err = errDIOStalled } return err From 13167f46b9884f3bb4b647a49f928c02af6fb325 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 27 Feb 2023 11:23:57 -0800 Subject: [PATCH 05/26] Optimize some locking for when under heavy loads. Signed-off-by: Derek Collison --- server/consumer.go | 38 +++++++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index cb6647a3..d9e4cc87 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1378,7 +1378,7 @@ func (o *consumer) deleteNotActive() { // Check to make sure we went away. // Don't think this needs to be a monitored go routine. go func() { - ticker := time.NewTicker(time.Second) + ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for range ticker.C { js.mu.RLock() @@ -3041,9 +3041,19 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { return nil, 0, errMaxAckPending } + store := o.mset.store + filter, filterWC := o.cfg.FilterSubject, o.filterWC + // Grab next message applicable to us. + // We will unlock here in case lots of contention, e.g. WQ. + o.mu.Unlock() pmsg := getJSPubMsgFromPool() - sm, sseq, err := o.mset.store.LoadNextMsg(o.cfg.FilterSubject, o.filterWC, seq, &pmsg.StoreMsg) + sm, sseq, err := store.LoadNextMsg(filter, filterWC, seq, &pmsg.StoreMsg) + if sm == nil { + pmsg.returnToPool() + pmsg, dc = nil, 0 + } + o.mu.Lock() if sseq >= o.sseq { o.sseq = sseq + 1 @@ -3052,11 +3062,6 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { } } - if sm == nil { - pmsg.returnToPool() - return nil, 0, err - } - return pmsg, dc, err } @@ -3779,14 +3784,22 @@ func (o *consumer) removeFromRedeliverQueue(seq uint64) bool { // Checks the pending messages. func (o *consumer) checkPending() { - o.mu.Lock() - defer o.mu.Unlock() - + o.mu.RLock() mset := o.mset // On stop, mset and timer will be nil. if mset == nil || o.ptmr == nil { + o.mu.RUnlock() return } + o.mu.RUnlock() + + var shouldUpdateState bool + var state StreamState + mset.store.FastState(&state) + fseq := state.FirstSeq + + o.mu.Lock() + defer o.mu.Unlock() now := time.Now().UnixNano() ttl := int64(o.cfg.AckWait) @@ -3797,11 +3810,6 @@ func (o *consumer) checkPending() { next = int64(o.cfg.BackOff[l-1]) } - var shouldUpdateState bool - var state StreamState - mset.store.FastState(&state) - fseq := state.FirstSeq - // Since we can update timestamps, we have to review all pending. // We will now bail if we see an ack pending in bound to us via o.awl. var expired []uint64 From 2642a8c03debc2c4fb6deeca0929961237a00096 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 27 Feb 2023 11:24:27 -0800 Subject: [PATCH 06/26] Optimize locking for when under heavy loads. Signed-off-by: Derek Collison --- server/consumer.go | 7 ++++++- server/filestore.go | 2 +- server/norace_test.go | 2 +- server/stream.go | 19 ++++++++++--------- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index d9e4cc87..216f8a5e 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3876,7 +3876,12 @@ func (o *consumer) checkPending() { } if len(o.pending) > 0 { - o.ptmr.Reset(o.ackWait(time.Duration(next))) + delay := time.Duration(next) + if o.ptmr == nil { + o.ptmr = time.AfterFunc(delay, o.checkPending) + } else { + o.ptmr.Reset(o.ackWait(delay)) + } } else { // Make sure to stop timer and clear out any re delivery queues stopAndClearTimer(&o.ptmr) diff --git a/server/filestore.go b/server/filestore.go index 859e15ed..bbfe072b 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5939,7 +5939,7 @@ func (mb *msgBlock) writePerSubjectInfo() error { // Clear write flag if no error. mb.fssNeedsWrite = false } - <-dios + dios <- struct{}{} default: err = errDIOStalled } diff --git a/server/norace_test.go b/server/norace_test.go index 8bbb719b..074bd45e 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -3444,7 +3444,7 @@ func TestNoRaceJetStreamClusterMemoryStreamConsumerRaftGrowth(t *testing.T) { t.Fatalf("Error looking up consumer %q", "q1") } node := o.raftNode().(*raft) - checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { if ms := node.wal.(*memStore); ms.State().Msgs > 8192 { return fmt.Errorf("Did not compact the raft memory WAL") } diff --git a/server/stream.go b/server/stream.go index 0b4e9d8e..61ee609c 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4168,13 +4168,13 @@ func (mset *stream) signalConsumersLoop() { // This will update and signal all consumers that match. func (mset *stream) signalConsumers(subj string, seq uint64) { mset.clsMu.RLock() - defer mset.clsMu.RUnlock() - if mset.csl == nil { + mset.clsMu.RUnlock() return } - r := mset.csl.Match(subj) + mset.clsMu.RUnlock() + if len(r.psubs) == 0 { return } @@ -4756,19 +4756,20 @@ func (mset *stream) state() StreamState { } func (mset *stream) stateWithDetail(details bool) StreamState { - mset.mu.RLock() - c, store := mset.client, mset.store - mset.mu.RUnlock() - if c == nil || store == nil { + // mset.store does not change once set, so ok to reference here directly. + // We do this elsewhere as well. + store := mset.store + if store == nil { return StreamState{} } - // Currently rely on store. + + // Currently rely on store for details. if details { return store.State() } // Here we do the fast version. var state StreamState - mset.store.FastState(&state) + store.FastState(&state) return state } From 97213096017e566c1f5d03c51ca7b3a2ebd32821 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 27 Feb 2023 16:14:37 -0800 Subject: [PATCH 07/26] Do not allow meta snapshot processing during recovery to override. Make sure to process all stream updates during recovery through the ru structure. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 67 ++++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 46ff4f99..f9b7e667 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1027,8 +1027,8 @@ func (js *jetStream) monitorCluster() { case <-rqch: return case <-qch: - // Clean signal from shutdown routine so attempt to snapshot meta layer. - doSnapshot() + // Clean signal from shutdown routine so do best effort attempt to snapshot meta layer. + n.InstallSnapshot(js.metaSnapshot()) // Return the signal back since shutdown will be waiting. close(qch) return @@ -1038,7 +1038,6 @@ func (js *jetStream) monitorCluster() { if ce == nil { // Signals we have replayed all of our metadata. js.clearMetaRecovering() - // Process any removes that are still valid after recovery. for _, ca := range ru.removeConsumers { js.processConsumerRemoval(ca) @@ -1046,10 +1045,11 @@ func (js *jetStream) monitorCluster() { for _, sa := range ru.removeStreams { js.processStreamRemoval(sa) } - // Same with any pending updates. + // Process pending updates. for _, sa := range ru.updateStreams { js.processUpdateStreamAssignment(sa) } + // Now consumers. for _, ca := range ru.updateConsumers { js.processConsumerAssignment(ca) } @@ -1061,20 +1061,11 @@ func (js *jetStream) monitorCluster() { // FIXME(dlc) - Deal with errors. if didSnap, didRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil { _, nb := n.Applied(ce.Index) - // If we processed a snapshot and are recovering remove our pending state. - if didSnap && js.isMetaRecovering() { - ru = &recoveryUpdates{ - removeStreams: make(map[string]*streamAssignment), - removeConsumers: make(map[string]*consumerAssignment), - updateStreams: make(map[string]*streamAssignment), - updateConsumers: make(map[string]*consumerAssignment), - } - } if js.hasPeerEntries(ce.Entries) || didSnap || didRemoval { // Since we received one make sure we have our own since we do not store // our meta state outside of raft. doSnapshot() - } else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 && time.Since(lastSnapTime) > 5*time.Second { + } else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 { doSnapshot() } } @@ -1212,7 +1203,7 @@ func (js *jetStream) metaSnapshot() []byte { return s2.EncodeBetter(nil, b) } -func (js *jetStream) applyMetaSnapshot(buf []byte) error { +func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecovering bool) error { var wsas []writeableStreamAssignment if len(buf) > 0 { jse, err := s2.Decode(nil, buf) @@ -1289,13 +1280,21 @@ func (js *jetStream) applyMetaSnapshot(buf []byte) error { // Do removals first. for _, sa := range saDel { js.setStreamAssignmentRecovering(sa) - js.processStreamRemoval(sa) + + if isRecovering { + key := sa.recoveryKey() + ru.removeStreams[key] = sa + delete(ru.updateStreams, key) + } else { + js.processStreamRemoval(sa) + } } // Now do add for the streams. Also add in all consumers. for _, sa := range saAdd { js.setStreamAssignmentRecovering(sa) js.processStreamAssignment(sa) - // We can simply add the consumers. + + // We can simply process the consumers. for _, ca := range sa.consumers { js.setConsumerAssignmentRecovering(ca) js.processConsumerAssignment(ca) @@ -1306,17 +1305,35 @@ func (js *jetStream) applyMetaSnapshot(buf []byte) error { // sure to process any changes. for _, sa := range saChk { js.setStreamAssignmentRecovering(sa) - js.processUpdateStreamAssignment(sa) + if isRecovering { + key := sa.recoveryKey() + ru.updateStreams[key] = sa + delete(ru.removeStreams, key) + } else { + js.processUpdateStreamAssignment(sa) + } } // Now do the deltas for existing stream's consumers. for _, ca := range caDel { js.setConsumerAssignmentRecovering(ca) - js.processConsumerRemoval(ca) + if isRecovering { + key := ca.recoveryKey() + ru.removeConsumers[key] = ca + delete(ru.updateConsumers, key) + } else { + js.processConsumerRemoval(ca) + } } for _, ca := range caAdd { js.setConsumerAssignmentRecovering(ca) - js.processConsumerAssignment(ca) + if isRecovering { + key := ca.recoveryKey() + delete(ru.removeConsumers, key) + ru.updateConsumers[key] = ca + } else { + js.processConsumerAssignment(ca) + } } return nil @@ -1534,7 +1551,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo for _, e := range entries { if e.Type == EntrySnapshot { - js.applyMetaSnapshot(e.Data) + js.applyMetaSnapshot(e.Data, ru, isRecovering) didSnap = true } else if e.Type == EntryRemovePeer { if !isRecovering { @@ -3139,9 +3156,11 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme mset.setStreamAssignment(sa) if err = mset.updateWithAdvisory(sa.Config, false); err != nil { s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err) - // Process the raft group and make sure it's running if needed. - js.createRaftGroup(acc.GetName(), osa.Group, storage) - mset.setStreamAssignment(osa) + if osa != nil { + // Process the raft group and make sure it's running if needed. + js.createRaftGroup(acc.GetName(), osa.Group, storage) + mset.setStreamAssignment(osa) + } if rg.node != nil { rg.node.Delete() rg.node = nil From 60787065440d58ce954fcef3ce6e19c3840aac16 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 27 Feb 2023 17:08:01 -0800 Subject: [PATCH 08/26] Fixup test for new parameters Signed-off-by: Derek Collison --- server/jetstream_cluster_3_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 4b6ec601..6454baaa 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -2809,9 +2809,9 @@ func TestJetStreamClusterWALBuildupOnNoOpPull(t *testing.T) { _, _ = sub.Fetch(1, nats.MaxWait(time.Microsecond)) } - // Needs to be at least 5 seconds, otherwise we won't hit the + // Needs to be at least 10 seconds, otherwise we won't hit the // minSnapDelta that prevents us from snapshotting too often - time.Sleep(time.Second * 6) + time.Sleep(time.Second * 11) for i := 0; i < 1024; i++ { _, _ = sub.Fetch(1, nats.MaxWait(time.Microsecond)) From 576d31748fff498c278ac3b1b85b9bd5790a9a88 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 27 Feb 2023 17:12:23 -0800 Subject: [PATCH 09/26] Sometimes do force meta snapshot Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index f9b7e667..610535f8 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1000,7 +1000,7 @@ func (js *jetStream) monitorCluster() { // Snapshotting function. doSnapshot := func() { // Suppress during recovery. - if js.isMetaRecovering() || time.Since(lastSnapTime) < minSnapDelta { + if js.isMetaRecovering() { return } snap := js.metaSnapshot() @@ -1059,13 +1059,13 @@ func (js *jetStream) monitorCluster() { continue } // FIXME(dlc) - Deal with errors. - if didSnap, didRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil { + if didSnap, didStreamRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil { _, nb := n.Applied(ce.Index) - if js.hasPeerEntries(ce.Entries) || didSnap || didRemoval { + if js.hasPeerEntries(ce.Entries) || didSnap || didStreamRemoval { // Since we received one make sure we have our own since we do not store // our meta state outside of raft. doSnapshot() - } else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 { + } else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 && time.Since(lastSnapTime) > minSnapDelta { doSnapshot() } } @@ -1633,7 +1633,6 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo delete(ru.updateConsumers, key) } else { js.processConsumerRemoval(ca) - didRemove = true } case updateStreamOp: sa, err := decodeStreamAssignment(buf[1:]) From fa8afba68fc327884ea0f5322a4d3b7681e1710a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 27 Feb 2023 17:21:06 -0800 Subject: [PATCH 10/26] Only warn on write errors if not closed in case they linger under pressure and blocking on dios Signed-off-by: Derek Collison --- server/raft.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/server/raft.go b/server/raft.go index 7dabfc64..b98db197 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3374,6 +3374,10 @@ func (n *raft) readTermVote() (term uint64, voted string, err error) { // Lock should be held. func (n *raft) setWriteErrLocked(err error) { + // Check if we are closed already. + if n.state == Closed { + return + } // Ignore if already set. if n.werr == err || err == nil { return @@ -3412,6 +3416,12 @@ func (n *raft) fileWriter() { psf := filepath.Join(n.sd, peerStateFile) n.RUnlock() + isClosed := func() bool { + n.RLock() + defer n.RUnlock() + return n.state == Closed + } + for s.isRunning() { select { case <-n.quit: @@ -3424,7 +3434,7 @@ func (n *raft) fileWriter() { <-dios err := os.WriteFile(tvf, buf[:], 0640) dios <- struct{}{} - if err != nil { + if err != nil && !isClosed() { n.setWriteErr(err) n.warn("Error writing term and vote file for %q: %v", n.group, err) } @@ -3435,7 +3445,7 @@ func (n *raft) fileWriter() { <-dios err := os.WriteFile(psf, buf, 0640) dios <- struct{}{} - if err != nil { + if err != nil && !isClosed() { n.setWriteErr(err) n.warn("Error writing peer state file for %q: %v", n.group, err) } From aad8aa6f213c0f683c16d9d411d5f12cd46a1f1f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 27 Feb 2023 17:40:44 -0800 Subject: [PATCH 11/26] Do not need lock to grab js here Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 610535f8..268030a3 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -198,17 +198,15 @@ func (s *Server) trackedJetStreamServers() (js, total int) { func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) { s.mu.RLock() - shutdown := s.shutdown - js := s.js + shutdown, js := s.shutdown, s.js s.mu.RUnlock() if shutdown || js == nil { return nil, nil } - js.mu.RLock() + // Only set once, do not need a lock. cc := js.cluster - js.mu.RUnlock() if cc == nil { return nil, nil } From 8732022d2682600562c0e8314c990a12808e1d90 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Tue, 28 Feb 2023 09:39:00 +0100 Subject: [PATCH 12/26] Fix JWT claims update if headers are passed in request Claims update message requires only payload to be passed, but passing headers should not fail the request. This change ensures we extract payload from raw message before decoding it. Before this change, passing claims update with headers would return cryptic `expected x chunks` error. Signed-off-by: Tomasz Pietrek --- server/accounts.go | 10 +++++++-- server/jwt_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 2 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 472141bf..b68059d0 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -3902,7 +3902,10 @@ func (dr *DirAccResolver) Start(s *Server) error { return fmt.Errorf("error setting up update handling: %v", err) } } - if _, err := s.sysSubscribe(accClaimsReqSubj, func(_ *subscription, _ *client, _ *Account, subj, resp string, msg []byte) { + if _, err := s.sysSubscribe(accClaimsReqSubj, func(_ *subscription, c *client, _ *Account, _, resp string, msg []byte) { + // As this is a raw message, we need to extract payload and only decode claims from it, + // in case request is sent with headers. + _, msg = c.msgParts(msg) if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil { respondToUpdate(s, resp, "n/a", "jwt update resulted in error", err) } else if claim.Issuer == op && strict { @@ -4190,7 +4193,10 @@ func (dr *CacheDirAccResolver) Start(s *Server) error { return fmt.Errorf("error setting up update handling: %v", err) } } - if _, err := s.sysSubscribe(accClaimsReqSubj, func(_ *subscription, _ *client, _ *Account, subj, resp string, msg []byte) { + if _, err := s.sysSubscribe(accClaimsReqSubj, func(_ *subscription, c *client, _ *Account, _, resp string, msg []byte) { + // As this is a raw message, we need to extract payload and only decode claims from it, + // in case request is sent with headers. + _, msg = c.msgParts(msg) if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil { respondToUpdate(s, resp, "n/a", "jwt update cache resulted in error", err) } else if claim.Issuer == op && strict { diff --git a/server/jwt_test.go b/server/jwt_test.go index 6467e9b3..10858924 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -6132,6 +6132,61 @@ func TestJWTAccountProtectedImport(t *testing.T) { }) } +// Headers are ignored in claims update, but passing them should not cause error. +func TestJWTClaimsUpdateWithHeaders(t *testing.T) { + skp, spub := createKey(t) + newUser(t, skp) + + sclaim := jwt.NewAccountClaims(spub) + encodeClaim(t, sclaim, spub) + + akp, apub := createKey(t) + newUser(t, akp) + claim := jwt.NewAccountClaims(apub) + jwtClaim := encodeClaim(t, claim, apub) + + dirSrv := t.TempDir() + + conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + operator: %s + system_account: %s + resolver: { + type: full + dir: '%s' + } + `, ojwt, spub, dirSrv))) + + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + type zapi struct { + Server *ServerInfo + Data *Connz + Error *ApiError + } + + sc := natsConnect(t, s.ClientURL(), createUserCreds(t, s, skp)) + defer sc.Close() + // Pass claims update with headers. + msg := &nats.Msg{ + Subject: "$SYS.REQ.CLAIMS.UPDATE", + Data: []byte(jwtClaim), + Header: map[string][]string{"key": {"value"}}, + } + resp, err := sc.RequestMsg(msg, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var cz zapi + if err := json.Unmarshal(resp.Data, &cz); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if cz.Error != nil { + t.Fatalf("Unexpected error: %+v", cz.Error) + } +} + func TestJWTMappings(t *testing.T) { sysKp, syspub := createKey(t) sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) From 10c2c387b72720188a7ae4a31db21c7855d32036 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Tue, 28 Feb 2023 15:36:37 +0100 Subject: [PATCH 13/26] Fix panic if serviceExport is nil Signed-off-by: Tomasz Pietrek --- server/accounts.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/accounts.go b/server/accounts.go index b68059d0..c0a792e7 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -2360,7 +2360,9 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp // Always grab time and make sure response threshold timer is running. si.ts = time.Now().UnixNano() - osi.se.setResponseThresholdTimer() + if osi.se != nil { + osi.se.setResponseThresholdTimer() + } if rt == Singleton && tracking { si.latency = osi.latency From bee149b45805cffda219ca1c85fa37031bea2d2c Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 27 Feb 2023 17:49:00 -0800 Subject: [PATCH 14/26] Only need server's rlock here. Signed-off-by: Derek Collison --- server/jetstream.go | 4 ++-- server/jetstream_api.go | 20 +++++++++++--------- server/jetstream_cluster.go | 27 ++++++++++++++++++++++----- 3 files changed, 35 insertions(+), 16 deletions(-) diff --git a/server/jetstream.go b/server/jetstream.go index 78f4e00c..fd7926f0 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -784,9 +784,9 @@ func (js *jetStream) setJetStreamStandAlone(isStandAlone bool) { // JetStreamEnabled reports if jetstream is enabled for this server. func (s *Server) JetStreamEnabled() bool { var js *jetStream - s.mu.Lock() + s.mu.RLock() js = s.js - s.mu.Unlock() + s.mu.RUnlock() return js.isEnabled() } diff --git a/server/jetstream_api.go b/server/jetstream_api.go index cfe06442..890b41a3 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1591,9 +1591,14 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account, // Request for the list of all detailed stream info. // TODO(dlc) - combine with above long term func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { - if c == nil || !s.JetStreamEnabled() { + if c == nil { return } + js, cc := s.getJetStreamCluster() + if js == nil { + return + } + ci, acc, _, msg, err := s.getRequestInfo(c, rmsg) if err != nil { s.Warnf(badAPIRequestT, msg) @@ -1606,18 +1611,15 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s } // Determine if we should proceed here when we are in clustered mode. - if s.JetStreamIsClustered() { - js, cc := s.getJetStreamCluster() - if js == nil || cc == nil { - return - } - if js.isLeaderless() { + if cc != nil { + leader, leaderless := js.leaderStatus() + if leaderless { resp.Error = NewJSClusterNotAvailError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } // Make sure we are meta leader. - if !s.JetStreamIsLeader() { + if !leader { return } } @@ -1647,7 +1649,7 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s } // Clustered mode will invoke a scatter and gather. - if s.JetStreamIsClustered() { + if cc != nil { // Need to copy these off before sending.. don't move this inside startGoRoutine!!! msg = copyBytes(msg) s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, filter, offset, subject, reply, msg) }) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 268030a3..57dd718e 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -206,11 +206,7 @@ func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) { } // Only set once, do not need a lock. - cc := js.cluster - if cc == nil { - return nil, nil - } - return js, cc + return js, js.cluster } func (s *Server) JetStreamIsClustered() bool { @@ -717,6 +713,27 @@ func (js *jetStream) server() *Server { return s } +// Returns leader status, both whether or not we are the leader and if the group is leaderless. +func (js *jetStream) leaderStatus() (bool, bool) { + js.mu.RLock() + defer js.mu.RUnlock() + + cc := js.cluster + if cc == nil || cc.meta == nil { + return false, true + } + isLeader := cc.meta.Leader() + if isLeader { + return true, false + } + // If we don't have a leader. + // Make sure we have been running for enough time. + if cc.meta.GroupLeader() == _EMPTY_ && time.Since(cc.meta.Created()) > lostQuorumIntervalDefault { + return false, true + } + return false, false +} + // Will respond if we do not think we have a metacontroller leader. func (js *jetStream) isLeaderless() bool { js.mu.RLock() From b19fe508c48a170cfa21ee14c842ff1c4e37479a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 27 Feb 2023 19:25:54 -0800 Subject: [PATCH 15/26] Do not block routes/gws on internal stream and consumer info requests Signed-off-by: Derek Collison --- server/consumer.go | 3 ++- server/filestore.go | 4 ++++ server/jetstream_api.go | 20 +++++++++----------- server/jetstream_cluster.go | 28 ++++++---------------------- server/jetstream_test.go | 3 ++- server/stream.go | 7 +++++-- 6 files changed, 28 insertions(+), 37 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 216f8a5e..a36373c8 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1138,8 +1138,9 @@ func (o *consumer) setLeader(isLeader bool) { } } +// This is coming on thw wire so do not block here. func (o *consumer) handleClusterConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { - o.infoWithSnapAndReply(false, reply) + go o.infoWithSnapAndReply(false, reply) } // Lock should be held. diff --git a/server/filestore.go b/server/filestore.go index bbfe072b..4e615238 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5548,6 +5548,10 @@ func (mb *msgBlock) closeAndKeepIndex(viaLimits bool) { // Make sure to write the index file so we can remember last seq and ts. mb.writeIndexInfoLocked() mb.removePerSubjectInfoLocked() + if mb.ifd != nil { + mb.ifd.Close() + mb.ifd = nil + } } if mb.mfd != nil { diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 890b41a3..cfe06442 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1591,14 +1591,9 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account, // Request for the list of all detailed stream info. // TODO(dlc) - combine with above long term func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { - if c == nil { + if c == nil || !s.JetStreamEnabled() { return } - js, cc := s.getJetStreamCluster() - if js == nil { - return - } - ci, acc, _, msg, err := s.getRequestInfo(c, rmsg) if err != nil { s.Warnf(badAPIRequestT, msg) @@ -1611,15 +1606,18 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s } // Determine if we should proceed here when we are in clustered mode. - if cc != nil { - leader, leaderless := js.leaderStatus() - if leaderless { + if s.JetStreamIsClustered() { + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil { + return + } + if js.isLeaderless() { resp.Error = NewJSClusterNotAvailError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } // Make sure we are meta leader. - if !leader { + if !s.JetStreamIsLeader() { return } } @@ -1649,7 +1647,7 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s } // Clustered mode will invoke a scatter and gather. - if cc != nil { + if s.JetStreamIsClustered() { // Need to copy these off before sending.. don't move this inside startGoRoutine!!! msg = copyBytes(msg) s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, filter, offset, subject, reply, msg) }) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 57dd718e..98e1d014 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -713,27 +713,6 @@ func (js *jetStream) server() *Server { return s } -// Returns leader status, both whether or not we are the leader and if the group is leaderless. -func (js *jetStream) leaderStatus() (bool, bool) { - js.mu.RLock() - defer js.mu.RUnlock() - - cc := js.cluster - if cc == nil || cc.meta == nil { - return false, true - } - isLeader := cc.meta.Leader() - if isLeader { - return true, false - } - // If we don't have a leader. - // Make sure we have been running for enough time. - if cc.meta.GroupLeader() == _EMPTY_ && time.Since(cc.meta.Created()) > lostQuorumIntervalDefault { - return false, true - } - return false, false -} - // Will respond if we do not think we have a metacontroller leader. func (js *jetStream) isLeaderless() bool { js.mu.RLock() @@ -7611,7 +7590,12 @@ func (js *jetStream) streamAlternates(ci *ClientInfo, stream string) []StreamAlt return alts } -func (mset *stream) handleClusterStreamInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, _ []byte) { +// Internal request for stream info, this is coming on the wire so do not block here. +func (mset *stream) handleClusterStreamInfoRequest(_ *subscription, c *client, _ *Account, subject, reply string, _ []byte) { + go mset.processClusterStreamInfoRequest(reply) +} + +func (mset *stream) processClusterStreamInfoRequest(reply string) { mset.mu.RLock() sysc, js, sa, config := mset.sysc, mset.srv.js, mset.sa, mset.cfg stype := mset.cfg.Storage diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 7a8537ce..7cbe543e 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -11974,9 +11974,9 @@ func TestJetStreamServerEncryption(t *testing.T) { } for _, c := range cases { - t.Run(c.name, func(t *testing.T) { tmpl := ` + server_name: S22 listen: 127.0.0.1:-1 jetstream: {key: $JS_KEY, store_dir: '%s' %s} ` @@ -18209,6 +18209,7 @@ func TestJetStreamProperErrorDueToOverlapSubjects(t *testing.T) { func TestJetStreamServerCipherConvert(t *testing.T) { tmpl := ` + server_name: S22 listen: 127.0.0.1:-1 jetstream: {key: s3cr3t, store_dir: '%s', cipher: %s} ` diff --git a/server/stream.go b/server/stream.go index 61ee609c..faed1dbd 100644 --- a/server/stream.go +++ b/server/stream.go @@ -468,8 +468,6 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt // Assign our transform for republishing. mset.tr = tr } - - jsa.streams[cfg.Name] = mset storeDir := filepath.Join(jsa.storeDir, streamsDir, cfg.Name) jsa.mu.Unlock() @@ -556,6 +554,11 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt } } + // Register with our account last. + jsa.mu.Lock() + jsa.streams[cfg.Name] = mset + jsa.mu.Unlock() + return mset, nil } From adbb50fc21d55f279477a3d8ac38a02fb01cb227 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 28 Feb 2023 07:50:23 -0800 Subject: [PATCH 16/26] Fixed dios capacity to 4 due to testing under heavy load. Signed-off-by: Derek Collison --- server/filestore.go | 2 +- server/jetstream.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 4e615238..93260e0a 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -6836,7 +6836,7 @@ var dios chan struct{} // golang.org's semaphore seemed a bit heavy. func init() { // Based on Go max threads of 10k, limit ourselves to a max of 1k blocking IO calls. - const nIO = 1024 + const nIO = 4 dios = make(chan struct{}, nIO) // Fill it up to start. for i := 0; i < nIO; i++ { diff --git a/server/jetstream.go b/server/jetstream.go index fd7926f0..3c8c0623 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -853,9 +853,9 @@ func (s *Server) signalPullConsumers() { // Shutdown jetstream for this server. func (s *Server) shutdownJetStream() { - s.mu.Lock() + s.mu.RLock() js := s.js - s.mu.Unlock() + s.mu.RUnlock() if js == nil { return From d85bec2007a9013bc5952566ba7dc5395499f339 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 28 Feb 2023 11:22:16 -0800 Subject: [PATCH 17/26] Do not block in place on warning, and only warn if consumer not closed Signed-off-by: Derek Collison --- server/consumer.go | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index a36373c8..c562564f 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2116,12 +2116,9 @@ func (o *consumer) checkRedelivered(slseq uint64) { } } if shouldUpdateState { - if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil { + if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil && !o.closed { s, acc, mset, name := o.srv, o.acc, o.mset, o.name - // Can not hold lock while gather information about account and stream below. - o.mu.Unlock() - s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.name(), name, err) - o.mu.Lock() + go s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.name(), name, err) } } } @@ -3892,12 +3889,9 @@ func (o *consumer) checkPending() { // Update our state if needed. if shouldUpdateState { - if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil { + if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil && !o.closed { s, acc, mset, name := o.srv, o.acc, o.mset, o.name - // Can not hold lock while gather information about account and stream below. - o.mu.Unlock() - s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.name(), name, err) - o.mu.Lock() + go s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.name(), name, err) } } } From 3807441fd7a121e5995d2f8a31c5373c0424b2d8 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 28 Feb 2023 11:40:32 -0800 Subject: [PATCH 18/26] Always process inbound messages in separate execution context. Do not duplicate work on leader, sealed and clustered state. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 10 +++++++- server/stream.go | 49 ++++++++++++------------------------- 2 files changed, 25 insertions(+), 34 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 98e1d014..9f1762c3 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -6782,7 +6782,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ name, stype, store := mset.cfg.Name, mset.cfg.Storage, mset.store s, js, jsa, st, rf, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node maxMsgSize, lseq, clfs := int(mset.cfg.MaxMsgSize), mset.lseq, mset.clfs - isLeader := mset.isLeader() + isLeader, isSealed := mset.isLeader(), mset.cfg.Sealed mset.mu.RUnlock() // This should not happen but possible now that we allow scale up, and scale down where this could trigger. @@ -6795,6 +6795,14 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ return NewJSClusterNotLeaderError() } + // Bail here if sealed. + if isSealed { + var resp = JSPubAckResponse{PubAck: &PubAck{Stream: mset.name()}, Error: NewJSStreamSealedError()} + b, _ := json.Marshal(resp) + mset.outq.sendMsg(reply, b) + return NewJSStreamSealedError() + } + // Check here pre-emptively if we have exceeded this server limits. if js.limitsExceeded(stype) { s.resourcesExeededError() diff --git a/server/stream.go b/server/stream.go index faed1dbd..80035576 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3646,39 +3646,9 @@ func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) { // processInboundJetStreamMsg handles processing messages bound for a stream. func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { - mset.mu.RLock() - isLeader, isClustered, isSealed := mset.isLeader(), mset.isClustered(), mset.cfg.Sealed - mset.mu.RUnlock() - - // If we are not the leader just ignore. - if !isLeader { - return - } - - if isSealed { - var resp = JSPubAckResponse{ - PubAck: &PubAck{Stream: mset.name()}, - Error: NewJSStreamSealedError(), - } - b, _ := json.Marshal(resp) - mset.outq.sendMsg(reply, b) - return - } - + // Always move this to another Go routine. hdr, msg := c.msgParts(rmsg) - - // If we are not receiving directly from a client we should move this to another Go routine. - if c.kind != CLIENT { - mset.queueInboundMsg(subject, reply, hdr, msg) - return - } - - // If we are clustered we need to propose this message to the underlying raft group. - if isClustered { - mset.processClusteredInboundMsg(subject, reply, hdr, msg) - } else { - mset.processJetStreamMsg(subject, reply, hdr, msg, 0, 0) - } + mset.queueInboundMsg(subject, reply, hdr, msg) } var ( @@ -3706,11 +3676,24 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, numConsumers := len(mset.consumers) interestRetention := mset.cfg.Retention == InterestPolicy // Snapshot if we are the leader and if we can respond. - isLeader := mset.isLeader() + isLeader, isSealed := mset.isLeader(), mset.cfg.Sealed canRespond := doAck && len(reply) > 0 && isLeader var resp = &JSPubAckResponse{} + // Bail here if sealed. + if isSealed { + outq := mset.outq + mset.mu.Unlock() + if canRespond && outq != nil { + resp.PubAck = &PubAck{Stream: name} + resp.Error = ApiErrors[JSStreamSealedErr] + b, _ := json.Marshal(resp) + outq.sendMsg(reply, b) + } + return ApiErrors[JSStreamSealedErr] + } + var buf [256]byte pubAck := append(buf[:0], mset.pubAck...) From 6bda358fa3ac292e98c72ff1cb3844ae8a05c5bf Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 28 Feb 2023 12:26:11 -0800 Subject: [PATCH 19/26] Fix tests that made assumptions about single server processing. Signed-off-by: Derek Collison --- server/jetstream_test.go | 69 ++++++++++++++++------------------------ 1 file changed, 27 insertions(+), 42 deletions(-) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 7cbe543e..eab8a4f5 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -207,12 +207,10 @@ func TestJetStreamAddStream(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() - nc.Publish("foo", []byte("Hello World!")) - nc.Flush() - + js.Publish("foo", []byte("Hello World!")) state := mset.state() if state.Msgs != 1 { t.Fatalf("Expected 1 message, got %d", state.Msgs) @@ -221,9 +219,7 @@ func TestJetStreamAddStream(t *testing.T) { t.Fatalf("Expected non-zero bytes") } - nc.Publish("foo", []byte("Hello World Again!")) - nc.Flush() - + js.Publish("foo", []byte("Hello World Again!")) state = mset.state() if state.Msgs != 2 { t.Fatalf("Expected 2 messages, got %d", state.Msgs) @@ -5035,7 +5031,7 @@ func TestJetStreamDurableFilteredSubjectConsumerReconnect(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() sendMsgs := func(toSend int) { @@ -5046,11 +5042,9 @@ func TestJetStreamDurableFilteredSubjectConsumerReconnect(t *testing.T) { } else { subj = "foo.ZZ" } - if err := nc.Publish(subj, []byte("OK!")); err != nil { - return - } + _, err := js.Publish(subj, []byte("OK!")) + require_NoError(t, err) } - nc.Flush() } // Send 50 msgs @@ -5180,14 +5174,14 @@ func TestJetStreamConsumerInactiveNoDeadlock(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send lots of msgs and have them queued up. for i := 0; i < 10000; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() + if state := mset.state(); state.Msgs != 10000 { t.Fatalf("Expected %d messages, got %d", 10000, state.Msgs) } @@ -5339,14 +5333,13 @@ func TestJetStreamRedeliverCount(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send 10 msgs for i := 0; i < 10; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() if state := mset.state(); state.Msgs != 10 { t.Fatalf("Expected %d messages, got %d", 10, state.Msgs) } @@ -5490,14 +5483,13 @@ func TestJetStreamCanNotNakAckd(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send 10 msgs for i := 0; i < 10; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() if state := mset.state(); state.Msgs != 10 { t.Fatalf("Expected %d messages, got %d", 10, state.Msgs) } @@ -5564,14 +5556,13 @@ func TestJetStreamStreamPurge(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send 100 msgs for i := 0; i < 100; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() if state := mset.state(); state.Msgs != 100 { t.Fatalf("Expected %d messages, got %d", 100, state.Msgs) } @@ -5586,8 +5577,7 @@ func TestJetStreamStreamPurge(t *testing.T) { } time.Sleep(10 * time.Millisecond) now := time.Now() - nc.Publish("DC", []byte("OK!")) - nc.Flush() + js.Publish("DC", []byte("OK!")) state = mset.state() if state.Msgs != 1 { @@ -5622,14 +5612,13 @@ func TestJetStreamStreamPurgeWithConsumer(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send 100 msgs for i := 0; i < 100; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() if state := mset.state(); state.Msgs != 100 { t.Fatalf("Expected %d messages, got %d", 100, state.Msgs) } @@ -5684,7 +5673,7 @@ func TestJetStreamStreamPurgeWithConsumer(t *testing.T) { t.Fatalf("Expected ackfloor for obsseq to be 75, got %d", state.AckFloor.Consumer) } // Also make sure we can get new messages correctly. - nc.Request("DC", []byte("OK-22"), time.Second) + js.Publish("DC", []byte("OK-22")) if msg, err := nc.Request(nextSubj, nil, time.Second); err != nil { t.Fatalf("Unexpected error: %v", err) } else if string(msg.Data) != "OK-22" { @@ -5713,14 +5702,13 @@ func TestJetStreamStreamPurgeWithConsumerAndRedelivery(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send 100 msgs for i := 0; i < 100; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() if state := mset.state(); state.Msgs != 100 { t.Fatalf("Expected %d messages, got %d", 100, state.Msgs) } @@ -5768,7 +5756,7 @@ func TestJetStreamStreamPurgeWithConsumerAndRedelivery(t *testing.T) { t.Fatalf("Expected ackfloor for obsseq to be 75, got %d", state.AckFloor.Consumer) } // Also make sure we can get new messages correctly. - nc.Request("DC", []byte("OK-22"), time.Second) + js.Publish("DC", []byte("OK-22")) if msg, err := nc.Request(nextSubj, nil, time.Second); err != nil { t.Fatalf("Unexpected error: %v", err) } else if string(msg.Data) != "OK-22" { @@ -5797,16 +5785,15 @@ func TestJetStreamInterestRetentionStream(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send 100 msgs totalMsgs := 100 for i := 0; i < totalMsgs; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() checkNumMsgs := func(numExpected int) { t.Helper() @@ -5842,9 +5829,8 @@ func TestJetStreamInterestRetentionStream(t *testing.T) { mset.addConsumer(&ConsumerConfig{DeliverSubject: sub3.Subject, AckPolicy: AckNone}) for i := 0; i < totalMsgs; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() checkNumMsgs(totalMsgs) @@ -8412,15 +8398,14 @@ func TestJetStreamDeleteMsg(t *testing.T) { t.Fatalf("Unexpected error adding stream: %v", err) } - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() pubTen := func() { t.Helper() for i := 0; i < 10; i++ { - nc.Publish("foo", []byte("Hello World!")) + js.Publish("foo", []byte("Hello World!")) } - nc.Flush() } pubTen() From 24cb5706464fa0e8707521685412a181c3bbcaab Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 28 Feb 2023 13:05:11 -0800 Subject: [PATCH 20/26] Do not lock on stream name for consumer write state error Signed-off-by: Derek Collison --- server/consumer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index c562564f..4dfde305 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2118,7 +2118,7 @@ func (o *consumer) checkRedelivered(slseq uint64) { if shouldUpdateState { if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil && !o.closed { s, acc, mset, name := o.srv, o.acc, o.mset, o.name - go s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.name(), name, err) + s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.cfg.Name, name, err) } } } @@ -3891,7 +3891,7 @@ func (o *consumer) checkPending() { if shouldUpdateState { if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil && !o.closed { s, acc, mset, name := o.srv, o.acc, o.mset, o.name - go s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.name(), name, err) + s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.cfg.Name, name, err) } } } From 68cd3128701ef8b4ee768465858605579bafe745 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 28 Feb 2023 13:28:09 -0800 Subject: [PATCH 21/26] Be more conservative on defaultMaxTotalCatchupOutBytes, default to 64M Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 9f1762c3..7a20eda1 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -7638,7 +7638,7 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) { sysc.sendInternalMsg(reply, _EMPTY_, nil, si) } -const defaultMaxTotalCatchupOutBytes = int64(128 * 1024 * 1024) // 128MB for now, for the total server. +const defaultMaxTotalCatchupOutBytes = int64(64 * 1024 * 1024) // 64MB for now, for the total server. // Current total outstanding catchup bytes. func (s *Server) gcbTotal() int64 { From bab10c1ecbb6c0674f9aabf78c682a885a428af3 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 28 Feb 2023 13:39:24 -0800 Subject: [PATCH 22/26] Revert closeAndKeepIndex behavior Signed-off-by: Derek Collison --- server/filestore.go | 35 ++++++++--------------------------- 1 file changed, 8 insertions(+), 27 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 93260e0a..bfe93035 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5541,40 +5541,21 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) { // When we have an empty block but want to keep the index for timestamp info etc. // Lock should be held. func (mb *msgBlock) closeAndKeepIndex(viaLimits bool) { - // We will leave a 0 length blk marker so we can remember. - doIO := func() { - // Write out empty file. - os.WriteFile(mb.mfn, nil, defaultFilePerms) - // Make sure to write the index file so we can remember last seq and ts. - mb.writeIndexInfoLocked() - mb.removePerSubjectInfoLocked() - if mb.ifd != nil { - mb.ifd.Close() - mb.ifd = nil - } - } - + // We will leave a 0 length blk marker. if mb.mfd != nil { - mb.mfd.Close() - mb.mfd = nil - // We used to truncate here but can be quite expensive based on platform. + mb.mfd.Truncate(0) + } else { + // We were closed, so just write out an empty file. + os.WriteFile(mb.mfn, nil, defaultFilePerms) } + // Make sure to write the index file so we can remember last seq and ts. + mb.writeIndexInfoLocked() // Close mb.dirtyCloseWithRemove(false) // Make sure to remove fss state. mb.fss = nil - - // If via limits we can do this out of line in separate Go routine. - if viaLimits { - go func() { - mb.mu.Lock() - defer mb.mu.Unlock() - doIO() - }() - } else { - doIO() - } + mb.removePerSubjectInfoLocked() // If we are encrypted we should reset our bek counter. if mb.bek != nil { From 9d4a603aaff6e8461cfd504ce53be35d7242fdd6 Mon Sep 17 00:00:00 2001 From: Jeremy Saenz Date: Tue, 28 Feb 2023 14:20:18 -0800 Subject: [PATCH 23/26] Update LEAFZ to include leafnode server/connection name --- server/leafnode.go | 8 +++++++- server/monitor.go | 2 ++ server/monitor_test.go | 9 +++++++++ 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/server/leafnode.go b/server/leafnode.go index 6be7657e..7f2e8dcf 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -888,7 +888,13 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf } now := time.Now().UTC() - c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now} + var clientOpts = ClientOpts{ + Name: opts.ServerName, + Verbose: defaultOpts.Verbose, + Pedantic: defaultOpts.Pedantic, + Echo: defaultOpts.Echo, + } + c := &client{srv: s, nc: conn, kind: LEAF, opts: clientOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now} // Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs c.leaf = &leaf{} diff --git a/server/monitor.go b/server/monitor.go index b19ffb2c..d4365f60 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2094,6 +2094,7 @@ type LeafzOptions struct { // LeafInfo has detailed information on each remote leafnode connection. type LeafInfo struct { + Name string `json:"name"` Account string `json:"account"` IP string `json:"ip"` Port int `json:"port"` @@ -2133,6 +2134,7 @@ func (s *Server) Leafz(opts *LeafzOptions) (*Leafz, error) { for _, ln := range lconns { ln.mu.Lock() lni := &LeafInfo{ + Name: ln.opts.Name, Account: ln.acc.Name, IP: ln.host, Port: int(ln.port), diff --git a/server/monitor_test.go b/server/monitor_test.go index 67fedb5c..8096ce22 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -3707,11 +3707,13 @@ func TestMonitorLeafz(t *testing.T) { } acc1, mycreds1 := createAcc(t) acc2, mycreds2 := createAcc(t) + leafName := "my-leaf-node" content = ` port: -1 http: "127.0.0.1:-1" ping_interval = 1 + server_name: %s accounts { %s { users [ @@ -3740,6 +3742,7 @@ func TestMonitorLeafz(t *testing.T) { } ` config := fmt.Sprintf(content, + leafName, acc1.Name, acc2.Name, acc1.Name, ob.LeafNode.Port, mycreds1, acc2.Name, ob.LeafNode.Port, mycreds2) @@ -3814,6 +3817,9 @@ func TestMonitorLeafz(t *testing.T) { } else { t.Fatalf("Expected account to be %q or %q, got %q", acc1.Name, acc2.Name, ln.Account) } + if ln.Name != leafName { + t.Fatalf("Expected name to be %q, got %q", leafName, ln.Name) + } if ln.RTT == "" { t.Fatalf("RTT not tracked?") } @@ -3902,6 +3908,9 @@ func TestMonitorLeafz(t *testing.T) { } else { t.Fatalf("Expected account to be %q or %q, got %q", acc1.Name, acc2.Name, ln.Account) } + if ln.Name != leafName { + t.Fatalf("Expected name to be %q, got %q", leafName, ln.Name) + } if ln.RTT == "" { t.Fatalf("RTT not tracked?") } From 724160ebac1b94602355a87b485a50c69b39849e Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 28 Feb 2023 13:52:25 -0800 Subject: [PATCH 24/26] Fix flapping tests Signed-off-by: Derek Collison --- server/consumer.go | 2 +- server/filestore.go | 4 ++-- server/jetstream_cluster.go | 5 ++++- server/jetstream_cluster_3_test.go | 20 +++++++++----------- server/jetstream_super_cluster_test.go | 7 +------ 5 files changed, 17 insertions(+), 21 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 4dfde305..bcc1a959 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1138,7 +1138,7 @@ func (o *consumer) setLeader(isLeader bool) { } } -// This is coming on thw wire so do not block here. +// This is coming on the wire so do not block here. func (o *consumer) handleClusterConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { go o.infoWithSnapAndReply(false, reply) } diff --git a/server/filestore.go b/server/filestore.go index bfe93035..7e1725b4 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2703,7 +2703,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( isLastBlock := mb == fs.lmb isEmpty := mb.msgs == 0 // If we are removing the message via limits we do not need to write the index file here. - // If viaLimits this means ona restart we will properly cleanup these messages regardless. + // If viaLimits this means on a restart we will properly cleanup these messages regardless. shouldWriteIndex := !isEmpty && !viaLimits if fifo { @@ -6816,7 +6816,7 @@ var dios chan struct{} // Used to setup our simplistic counting semaphore using buffered channels. // golang.org's semaphore seemed a bit heavy. func init() { - // Based on Go max threads of 10k, limit ourselves to a max of 1k blocking IO calls. + // Limit ourselves to a max of 4 blocking IO calls. const nIO = 4 dios = make(chan struct{}, nIO) // Fill it up to start. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 7a20eda1..3a1444da 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -7638,7 +7638,10 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) { sysc.sendInternalMsg(reply, _EMPTY_, nil, si) } -const defaultMaxTotalCatchupOutBytes = int64(64 * 1024 * 1024) // 64MB for now, for the total server. +// 64MB for now, for the total server. This is max we will blast out if asked to +// do so to another server for purposes of catchups. +// This number should be ok on 1Gbit interface. +const defaultMaxTotalCatchupOutBytes = int64(64 * 1024 * 1024) // Current total outstanding catchup bytes. func (s *Server) gcbTotal() int64 { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 6454baaa..03e1e52e 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -2727,7 +2727,7 @@ func TestJetStreamClusterInterestPolicyEphemeral(t *testing.T) { name = test.name } - const msgs = 10_000 + const msgs = 5_000 done, count := make(chan bool), 0 sub, err := js.Subscribe(_EMPTY_, func(msg *nats.Msg) { @@ -2740,24 +2740,22 @@ func TestJetStreamClusterInterestPolicyEphemeral(t *testing.T) { require_NoError(t, err) // This happens only if we start publishing messages after consumer was created. - go func() { + pubDone := make(chan struct{}) + go func(subject string) { for i := 0; i < msgs; i++ { - js.PublishAsync(test.subject, []byte("DATA")) + js.Publish(subject, []byte("DATA")) } - select { - case <-js.PublishAsyncComplete(): - case <-time.After(5 * inactiveThreshold): - } - }() + close(pubDone) + }(test.subject) // Wait for inactive threshold to expire and all messages to be published and received // Bug is we clean up active consumers when we should not. time.Sleep(3 * inactiveThreshold / 2) select { - case <-done: - case <-time.After(5 * time.Second): - t.Fatalf("Did not receive done signal") + case <-pubDone: + case <-time.After(10 * time.Second): + t.Fatalf("Did not receive completion signal") } info, err := js.ConsumerInfo(test.stream, name) diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 3c06d1c6..8abbe17c 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -2082,14 +2082,9 @@ func TestJetStreamSuperClusterMovingStreamAndMoveBack(t *testing.T) { toSend := 10_000 for i := 0; i < toSend; i++ { - _, err := js.PublishAsync("TEST", []byte("HELLO WORLD")) + _, err := js.Publish("TEST", []byte("HELLO WORLD")) require_NoError(t, err) } - select { - case <-js.PublishAsyncComplete(): - case <-time.After(5 * time.Second): - t.Fatalf("Did not receive completion signal") - } _, err = js.UpdateStream(&nats.StreamConfig{ Name: "TEST", From 1956fa3e2309b9fb792709c8e7fb98a1a436dfbf Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 28 Feb 2023 14:05:22 -0800 Subject: [PATCH 25/26] Signal a metasnapshot for consumer deletes as well Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3a1444da..2cbfc8a9 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1627,6 +1627,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo delete(ru.updateConsumers, key) } else { js.processConsumerRemoval(ca) + didRemove = true } case updateStreamOp: sa, err := decodeStreamAssignment(buf[1:]) From 95ed47186693ec517460e23a26be4d8021edc76b Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 28 Feb 2023 14:56:56 -0800 Subject: [PATCH 26/26] Bump to 2.9.15-RC.3 Signed-off-by: Derek Collison --- server/const.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/const.go b/server/const.go index aa35e30e..8fbc35c1 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.9.15-RC.2" + VERSION = "2.9.15-RC.3" // PROTO is the currently supported protocol. // 0 was the original