From 7cc49f8c625711b7fc9f79509e1e8280fbcc4f22 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 15 Feb 2021 08:23:16 -0800 Subject: [PATCH 1/6] Use system account when nil Signed-off-by: Derek Collison --- server/jetstream_events.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/jetstream_events.go b/server/jetstream_events.go index 8973c88f..73143cf1 100644 --- a/server/jetstream_events.go +++ b/server/jetstream_events.go @@ -6,6 +6,12 @@ import ( ) func (s *Server) publishAdvisory(acc *Account, subject string, adv interface{}) { + if acc == nil { + acc = s.SystemAccount() + if acc == nil { + return + } + } ej, err := json.Marshal(adv) if err == nil { err = s.sendInternalAccountMsg(acc, subject, ej) From 30cc739797856aff39b8395f4fb98464c9c391cb Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 16 Feb 2021 13:19:13 -0800 Subject: [PATCH 2/6] Increase internal sendq due to JSC Signed-off-by: Derek Collison --- server/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/server.go b/server/server.go index 7c15ea3b..b980829a 100644 --- a/server/server.go +++ b/server/server.go @@ -1076,7 +1076,7 @@ func (s *Server) SetDefaultSystemAccount() error { } // For internal sends. -const internalSendQLen = 8192 +const internalSendQLen = 256 * 1024 // Assign a system account. Should only be called once. // This sets up a server to send and receive messages from From 0dcb006968eed06af08aa452423984c300831d7f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 16 Feb 2021 13:24:09 -0800 Subject: [PATCH 3/6] Handle AppendEntry reponse inline, lower outstanding on catchup to stabilize Signed-off-by: Derek Collison --- server/raft.go | 51 ++++++++++++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/server/raft.go b/server/raft.go index d3762cc7..4ed24a7f 100644 --- a/server/raft.go +++ b/server/raft.go @@ -42,6 +42,7 @@ type RaftNode interface { Compact(index uint64) error State() RaftState Size() (entries, bytes uint64) + Progress() (index, commit, applied uint64) Leader() bool Quorum() bool Current() bool @@ -168,7 +169,6 @@ type raft struct { quit chan struct{} reqs chan *voteRequest votes chan *voteResponse - resp chan *appendEntryResponse leadc chan bool stepdown chan string } @@ -329,7 +329,6 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { quit: make(chan struct{}), reqs: make(chan *voteRequest, 8), votes: make(chan *voteResponse, 32), - resp: make(chan *appendEntryResponse, 256), propc: make(chan *Entry, 256), applyc: make(chan *CommittedEntry, 512), leadc: make(chan bool, 8), @@ -680,14 +679,20 @@ func (n *raft) InstallSnapshot(data []byte) error { return errNodeClosed } - ae, err := n.loadEntry(n.applied) - if err != nil { + if state := n.wal.State(); state.LastSeq == n.applied { n.Unlock() - return err + return nil + } + + var term uint64 + if ae, err := n.loadEntry(n.applied); err != nil && ae != nil { + term = ae.term + } else { + term = n.term } snap := &snapshot{ - lastTerm: ae.term, + lastTerm: term, lastIndex: n.applied, peerstate: encodePeerState(&peerState{n.peerNames(), n.csz}), data: data, @@ -704,7 +709,7 @@ func (n *raft) InstallSnapshot(data []byte) error { // Remember our latest snapshot file. n.snapfile = sfile - _, err = n.wal.Compact(snap.lastIndex) + _, err := n.wal.Compact(snap.lastIndex) n.Unlock() psnaps, _ := ioutil.ReadDir(snapDir) @@ -963,13 +968,20 @@ func (n *raft) campaign() error { return nil } -// State return the current state for this node. +// State returns the current state for this node. func (n *raft) State() RaftState { n.RLock() defer n.RUnlock() return n.state } +// Progress returns the current index, commit and applied values. +func (n *raft) Progress() (index, commit, applied uint64) { + n.RLock() + defer n.RUnlock() + return n.pindex + 1, n.commit, n.applied +} + // Size returns number of entries and total bytes for our WAL. func (n *raft) Size() (uint64, uint64) { n.RLock() @@ -1474,13 +1486,6 @@ func (n *raft) runAsLeader() { case newLeader := <-n.stepdown: n.switchToFollower(newLeader) return - case ar := <-n.resp: - n.trackPeer(ar.peer) - if ar.success { - n.trackResponse(ar) - } else if ar.reply != _EMPTY_ { - n.catchupFollower(ar) - } } } } @@ -1565,7 +1570,7 @@ func (n *raft) runCatchup(peer, subj string, indexUpdatesC <-chan uint64) { n.debug("Running catchup for %q", peer) - const maxOutstanding = 48 * 1024 * 1024 // 48MB for now. + const maxOutstanding = 2 * 1024 * 1024 // 2MB for now. next, total, om := uint64(0), 0, make(map[uint64]int) sendNext := func() { @@ -2103,6 +2108,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.Unlock() return } + if ps, err := decodePeerState(ae.entries[1].Data); err == nil { n.processPeerState(ps) // Also need to copy from client's buffer. @@ -2226,14 +2232,15 @@ func (n *raft) processPeerState(ps *peerState) { // handleAppendEntryResponse just places the decoded response on the appropriate channel. func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, subject, reply string, msg []byte) { - aer := n.decodeAppendEntryResponse(msg) + ar := n.decodeAppendEntryResponse(msg) if reply != _EMPTY_ { - aer.reply = reply + ar.reply = reply } - select { - case n.resp <- aer: - default: - n.error("Failed to place add entry response on chan for %q", n.group) + n.trackPeer(ar.peer) + if ar.success { + n.trackResponse(ar) + } else if ar.reply != _EMPTY_ { + n.catchupFollower(ar) } } From ddc800174ffdd631b73be9e45270cb2e62766c7e Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 16 Feb 2021 13:30:02 -0800 Subject: [PATCH 4/6] Stabilize catchups and snapshot logic Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 102 ++++++++++++++++++++---------------- 1 file changed, 56 insertions(+), 46 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index c75103fd..085f0760 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -739,11 +739,7 @@ func (js *jetStream) monitorCluster() { // Since we received one make sure we have our own since we do not store // our meta state outside of raft. doSnapshot() - } - } - // See if we could save some memory here. - if lastSnap != nil { - if _, b := n.Size(); b > uint64(len(lastSnap)*4) { + } else if _, b := n.Size(); lastSnap != nil && b > uint64(len(lastSnap)*4) { doSnapshot() } } @@ -1158,9 +1154,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) { defer s.Debugf("Exiting stream monitor for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name) const ( - compactInterval = 2 * time.Minute - compactSizeLimit = 32 * 1024 * 1024 - compactNumLimit = 4096 + compactInterval = 2 * time.Minute + compactSizeMin = 4 * 1024 * 1024 + compactNumMin = 32 ) t := time.NewTicker(compactInterval) @@ -1178,6 +1174,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) { } var lastSnap []byte + var lastApplied uint64 // Should only to be called from leader. doSnapshot := func() { @@ -1211,21 +1208,16 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) { // Apply our entries. if err := js.applyStreamEntries(mset, ce, isRecovering); err == nil { n.Applied(ce.Index) - if !isLeader { - // If over 32MB go ahead and compact if not the leader. - if m, b := n.Size(); m > compactNumLimit || b > compactSizeLimit { - doSnapshot() - } + ne := ce.Index - lastApplied + lastApplied = ce.Index + + // If over our compact min and we have at least min entries to compact, go ahead and snapshot/compact. + if _, b := n.Size(); lastSnap == nil || (b > compactSizeMin && ne > compactNumMin) { + doSnapshot() } } else { s.Warnf("Error applying entries to '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name) } - if isLeader && lastSnap != nil { - // If over 32MB go ahead and compact if not the leader. - if m, b := n.Size(); m > compactNumLimit || b > compactSizeLimit { - doSnapshot() - } - } case isLeader = <-lch: if isLeader && isRestore { acc, _ := s.LookupAccount(sa.Client.serviceAccount()) @@ -1235,6 +1227,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) { js.setStreamAssignmentResponded(sa) } js.processStreamLeaderChange(mset, isLeader) + if isLeader { + lastSnap = nil + } } case <-t.C: if isLeader { @@ -1341,7 +1336,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) { func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isRecovering bool) error { for _, e := range ce.Entries { if e.Type == EntrySnapshot { - if !isRecovering { + if !isRecovering && mset != nil { var snap streamSnapshot if err := json.Unmarshal(e.Data, &snap); err != nil { return err @@ -2271,15 +2266,16 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { js.mu.RUnlock() const ( - compactInterval = 2 * time.Minute - compactSizeLimit = 4 * 1024 * 1024 - compactNumLimit = 4096 + compactInterval = 2 * time.Minute + compactSizeMin = 8 * 1024 * 1024 + compactNumMin = 256 ) t := time.NewTicker(compactInterval) defer t.Stop() var lastSnap []byte + var lastApplied uint64 // Should only to be called from leader. doSnapshot := func() { @@ -2305,23 +2301,24 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { } if err := js.applyConsumerEntries(o, ce); err == nil { n.Applied(ce.Index) - // If over 4MB go ahead and compact if not the leader. - if !isLeader { - if m, b := n.Size(); m > compactNumLimit || b > compactSizeLimit { - n.Compact(ce.Index) - } - } - } - if isLeader && lastSnap != nil { - if _, b := n.Size(); b > uint64(len(lastSnap)*4) { + ne := ce.Index - lastApplied + lastApplied = ce.Index + + // If over our compact min and we have at least min entries to compact, go ahead and snapshot/compact. + if _, b := n.Size(); lastSnap == nil || (b > compactSizeMin && ne > compactNumMin) { doSnapshot() } + } else { + s.Warnf("Error applying consumer entries to '%s > %s'", ca.Client.serviceAccount(), ca.Name) } case isLeader := <-lch: if !isLeader && n.GroupLeader() != noLeader { js.setConsumerAssignmentResponded(ca) } js.processConsumerLeaderChange(o, isLeader) + if isLeader { + lastSnap = nil + } case <-t.C: if isLeader { doSnapshot() @@ -3650,6 +3647,7 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) { mset.mu.Lock() state := mset.store.State() + sreq := mset.calculateSyncRequest(&state, snap) s, subject, n := mset.srv, mset.sa.Sync, mset.node mset.mu.Unlock() @@ -3701,22 +3699,27 @@ RETRY: } } - msgsC := make(chan []byte, 8*1024) + type mr struct { + msg []byte + reply string + } + msgsC := make(chan *mr, 32768) // Send our catchup request here. reply := syncReplySubject() sub, err = s.sysSubscribe(reply, func(_ *subscription, _ *client, _, reply string, msg []byte) { // Make copies - https://github.com/go101/go101/wiki // TODO(dlc) - Since we are using a buffer from the inbound client/route. - if len(msg) > 0 { - msg = append(msg[:0:0], msg...) - } - msgsC <- msg - if reply != _EMPTY_ { - s.sendInternalMsgLocked(reply, _EMPTY_, nil, nil) + select { + case msgsC <- &mr{msg: append(msg[:0:0], msg...), reply: reply}: + default: + s.Warnf("Failed to place catchup message onto internal channel: %d pending", len(msgsC)) + return } + }) if err != nil { + s.Errorf("Could not subscribe to stream catchup: %v", err) return } @@ -3730,8 +3733,9 @@ RETRY: // Run our own select loop here. for qch, lch := n.QuitC(), n.LeadChangeC(); ; { select { - case msg := <-msgsC: + case mrec := <-msgsC: notActive.Reset(activityInterval) + msg := mrec.msg // Check eof signaling. if len(msg) == 0 { goto RETRY @@ -3743,6 +3747,9 @@ RETRY: } else { goto RETRY } + if mrec.reply != _EMPTY_ { + s.sendInternalMsgLocked(mrec.reply, _EMPTY_, nil, nil) + } case <-notActive.C: s.Warnf("Catchup for stream '%s > %s' stalled", mset.account(), mset.name()) notActive.Reset(activityInterval) @@ -3753,7 +3760,6 @@ RETRY: return case isLeader := <-lch: js.processStreamLeaderChange(mset, isLeader) - return } } } @@ -3856,8 +3862,10 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { s := mset.srv defer s.grWG.Done() - const maxOut = int64(32 * 1024 * 1024) // 32MB for now. - out := int64(0) + const maxOutBytes = int64(2 * 1024 * 1024) // 2MB for now. + const maxOutMsgs = int32(16384) + outb := int64(0) + outm := int32(0) // Flow control processing. ackReplySize := func(subj string) int64 { @@ -3874,7 +3882,8 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { ackReply := syncAckSubject() ackSub, _ := s.sysSubscribe(ackReply, func(sub *subscription, c *client, subject, reply string, msg []byte) { sz := ackReplySize(subject) - atomic.AddInt64(&out, -sz) + atomic.AddInt64(&outb, -sz) + atomic.AddInt32(&outm, -1) select { case nextBatchC <- struct{}{}: default: @@ -3894,7 +3903,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { seq, last := sreq.FirstSeq, sreq.LastSeq sendNextBatch := func() { - for ; seq <= last && atomic.LoadInt64(&out) <= maxOut; seq++ { + for ; seq <= last && atomic.LoadInt64(&outb) <= maxOutBytes && atomic.LoadInt32(&outm) <= maxOutMsgs; seq++ { subj, hdr, msg, ts, err := mset.store.LoadMsg(seq) // if this is not a deleted msg, bail out. if err != nil && err != ErrStoreMsgNotFound && err != errDeletedMsg { @@ -3906,7 +3915,8 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { em := encodeStreamMsg(subj, _EMPTY_, hdr, msg, seq, ts) // Place size in reply subject for flow control. reply := fmt.Sprintf(ackReplyT, len(em)) - atomic.AddInt64(&out, int64(len(em))) + atomic.AddInt64(&outb, int64(len(em))) + atomic.AddInt32(&outm, 1) s.sendInternalMsgLocked(sendSubject, reply, nil, em) } } From eecec2aed12bcc5f9e1fa5cb93616ab769186daa Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 16 Feb 2021 14:09:03 -0800 Subject: [PATCH 5/6] Increase due to sendq Signed-off-by: Derek Collison --- test/norace_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/norace_test.go b/test/norace_test.go index 85aab500..8d3f07df 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -331,11 +331,11 @@ func TestNoRaceLargeClusterMem(t *testing.T) { checkClusterFormed(t, servers...) // Calculate in MB what we are using now. - const max = 50 * 1024 * 1024 // 50MB + const max = 60 * 1024 * 1024 // 60MB runtime.ReadMemStats(&m) used := m.TotalAlloc - pta if used > max { - t.Fatalf("Cluster using too much memory, expect < 50MB, got %dMB", used/(1024*1024)) + t.Fatalf("Cluster using too much memory, expect < 60MB, got %dMB", used/(1024*1024)) } for _, s := range servers { From ddc4cc79d223d85bb6483650f8a5d207a61ddb83 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 16 Feb 2021 15:58:46 -0800 Subject: [PATCH 6/6] Make sure to not process AR when no longer leader Signed-off-by: Derek Collison --- server/raft.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/server/raft.go b/server/raft.go index 4ed24a7f..d70027cf 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2230,8 +2230,13 @@ func (n *raft) processPeerState(ps *peerState) { writePeerState(n.sd, ps) } -// handleAppendEntryResponse just places the decoded response on the appropriate channel. +// handleAppendEntryResponse processes responses to append entries. func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, subject, reply string, msg []byte) { + // Ignore if not the leader. + if !n.Leader() { + n.debug("Ignoring append entry response, no longer leader") + return + } ar := n.decodeAppendEntryResponse(msg) if reply != _EMPTY_ { ar.reply = reply