mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-15 10:40:41 -07:00
Merge pull request #1916 from nats-io/better_restore
Better restore logic
This commit is contained in:
@@ -739,11 +739,7 @@ func (js *jetStream) monitorCluster() {
|
||||
// Since we received one make sure we have our own since we do not store
|
||||
// our meta state outside of raft.
|
||||
doSnapshot()
|
||||
}
|
||||
}
|
||||
// See if we could save some memory here.
|
||||
if lastSnap != nil {
|
||||
if _, b := n.Size(); b > uint64(len(lastSnap)*4) {
|
||||
} else if _, b := n.Size(); lastSnap != nil && b > uint64(len(lastSnap)*4) {
|
||||
doSnapshot()
|
||||
}
|
||||
}
|
||||
@@ -1158,9 +1154,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {
|
||||
defer s.Debugf("Exiting stream monitor for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name)
|
||||
|
||||
const (
|
||||
compactInterval = 2 * time.Minute
|
||||
compactSizeLimit = 32 * 1024 * 1024
|
||||
compactNumLimit = 4096
|
||||
compactInterval = 2 * time.Minute
|
||||
compactSizeMin = 4 * 1024 * 1024
|
||||
compactNumMin = 32
|
||||
)
|
||||
|
||||
t := time.NewTicker(compactInterval)
|
||||
@@ -1178,6 +1174,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {
|
||||
}
|
||||
|
||||
var lastSnap []byte
|
||||
var lastApplied uint64
|
||||
|
||||
// Should only to be called from leader.
|
||||
doSnapshot := func() {
|
||||
@@ -1211,21 +1208,16 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {
|
||||
// Apply our entries.
|
||||
if err := js.applyStreamEntries(mset, ce, isRecovering); err == nil {
|
||||
n.Applied(ce.Index)
|
||||
if !isLeader {
|
||||
// If over 32MB go ahead and compact if not the leader.
|
||||
if m, b := n.Size(); m > compactNumLimit || b > compactSizeLimit {
|
||||
doSnapshot()
|
||||
}
|
||||
ne := ce.Index - lastApplied
|
||||
lastApplied = ce.Index
|
||||
|
||||
// If over our compact min and we have at least min entries to compact, go ahead and snapshot/compact.
|
||||
if _, b := n.Size(); lastSnap == nil || (b > compactSizeMin && ne > compactNumMin) {
|
||||
doSnapshot()
|
||||
}
|
||||
} else {
|
||||
s.Warnf("Error applying entries to '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name)
|
||||
}
|
||||
if isLeader && lastSnap != nil {
|
||||
// If over 32MB go ahead and compact if not the leader.
|
||||
if m, b := n.Size(); m > compactNumLimit || b > compactSizeLimit {
|
||||
doSnapshot()
|
||||
}
|
||||
}
|
||||
case isLeader = <-lch:
|
||||
if isLeader && isRestore {
|
||||
acc, _ := s.LookupAccount(sa.Client.serviceAccount())
|
||||
@@ -1235,6 +1227,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {
|
||||
js.setStreamAssignmentResponded(sa)
|
||||
}
|
||||
js.processStreamLeaderChange(mset, isLeader)
|
||||
if isLeader {
|
||||
lastSnap = nil
|
||||
}
|
||||
}
|
||||
case <-t.C:
|
||||
if isLeader {
|
||||
@@ -1341,7 +1336,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {
|
||||
func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isRecovering bool) error {
|
||||
for _, e := range ce.Entries {
|
||||
if e.Type == EntrySnapshot {
|
||||
if !isRecovering {
|
||||
if !isRecovering && mset != nil {
|
||||
var snap streamSnapshot
|
||||
if err := json.Unmarshal(e.Data, &snap); err != nil {
|
||||
return err
|
||||
@@ -2271,15 +2266,16 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
|
||||
js.mu.RUnlock()
|
||||
|
||||
const (
|
||||
compactInterval = 2 * time.Minute
|
||||
compactSizeLimit = 4 * 1024 * 1024
|
||||
compactNumLimit = 4096
|
||||
compactInterval = 2 * time.Minute
|
||||
compactSizeMin = 8 * 1024 * 1024
|
||||
compactNumMin = 256
|
||||
)
|
||||
|
||||
t := time.NewTicker(compactInterval)
|
||||
defer t.Stop()
|
||||
|
||||
var lastSnap []byte
|
||||
var lastApplied uint64
|
||||
|
||||
// Should only to be called from leader.
|
||||
doSnapshot := func() {
|
||||
@@ -2305,23 +2301,24 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
|
||||
}
|
||||
if err := js.applyConsumerEntries(o, ce); err == nil {
|
||||
n.Applied(ce.Index)
|
||||
// If over 4MB go ahead and compact if not the leader.
|
||||
if !isLeader {
|
||||
if m, b := n.Size(); m > compactNumLimit || b > compactSizeLimit {
|
||||
n.Compact(ce.Index)
|
||||
}
|
||||
}
|
||||
}
|
||||
if isLeader && lastSnap != nil {
|
||||
if _, b := n.Size(); b > uint64(len(lastSnap)*4) {
|
||||
ne := ce.Index - lastApplied
|
||||
lastApplied = ce.Index
|
||||
|
||||
// If over our compact min and we have at least min entries to compact, go ahead and snapshot/compact.
|
||||
if _, b := n.Size(); lastSnap == nil || (b > compactSizeMin && ne > compactNumMin) {
|
||||
doSnapshot()
|
||||
}
|
||||
} else {
|
||||
s.Warnf("Error applying consumer entries to '%s > %s'", ca.Client.serviceAccount(), ca.Name)
|
||||
}
|
||||
case isLeader := <-lch:
|
||||
if !isLeader && n.GroupLeader() != noLeader {
|
||||
js.setConsumerAssignmentResponded(ca)
|
||||
}
|
||||
js.processConsumerLeaderChange(o, isLeader)
|
||||
if isLeader {
|
||||
lastSnap = nil
|
||||
}
|
||||
case <-t.C:
|
||||
if isLeader {
|
||||
doSnapshot()
|
||||
@@ -3650,6 +3647,7 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) {
|
||||
|
||||
mset.mu.Lock()
|
||||
state := mset.store.State()
|
||||
|
||||
sreq := mset.calculateSyncRequest(&state, snap)
|
||||
s, subject, n := mset.srv, mset.sa.Sync, mset.node
|
||||
mset.mu.Unlock()
|
||||
@@ -3701,22 +3699,27 @@ RETRY:
|
||||
}
|
||||
}
|
||||
|
||||
msgsC := make(chan []byte, 8*1024)
|
||||
type mr struct {
|
||||
msg []byte
|
||||
reply string
|
||||
}
|
||||
msgsC := make(chan *mr, 32768)
|
||||
|
||||
// Send our catchup request here.
|
||||
reply := syncReplySubject()
|
||||
sub, err = s.sysSubscribe(reply, func(_ *subscription, _ *client, _, reply string, msg []byte) {
|
||||
// Make copies - https://github.com/go101/go101/wiki
|
||||
// TODO(dlc) - Since we are using a buffer from the inbound client/route.
|
||||
if len(msg) > 0 {
|
||||
msg = append(msg[:0:0], msg...)
|
||||
}
|
||||
msgsC <- msg
|
||||
if reply != _EMPTY_ {
|
||||
s.sendInternalMsgLocked(reply, _EMPTY_, nil, nil)
|
||||
select {
|
||||
case msgsC <- &mr{msg: append(msg[:0:0], msg...), reply: reply}:
|
||||
default:
|
||||
s.Warnf("Failed to place catchup message onto internal channel: %d pending", len(msgsC))
|
||||
return
|
||||
}
|
||||
|
||||
})
|
||||
if err != nil {
|
||||
s.Errorf("Could not subscribe to stream catchup: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -3730,8 +3733,9 @@ RETRY:
|
||||
// Run our own select loop here.
|
||||
for qch, lch := n.QuitC(), n.LeadChangeC(); ; {
|
||||
select {
|
||||
case msg := <-msgsC:
|
||||
case mrec := <-msgsC:
|
||||
notActive.Reset(activityInterval)
|
||||
msg := mrec.msg
|
||||
// Check eof signaling.
|
||||
if len(msg) == 0 {
|
||||
goto RETRY
|
||||
@@ -3743,6 +3747,9 @@ RETRY:
|
||||
} else {
|
||||
goto RETRY
|
||||
}
|
||||
if mrec.reply != _EMPTY_ {
|
||||
s.sendInternalMsgLocked(mrec.reply, _EMPTY_, nil, nil)
|
||||
}
|
||||
case <-notActive.C:
|
||||
s.Warnf("Catchup for stream '%s > %s' stalled", mset.account(), mset.name())
|
||||
notActive.Reset(activityInterval)
|
||||
@@ -3753,7 +3760,6 @@ RETRY:
|
||||
return
|
||||
case isLeader := <-lch:
|
||||
js.processStreamLeaderChange(mset, isLeader)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3856,8 +3862,10 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
|
||||
s := mset.srv
|
||||
defer s.grWG.Done()
|
||||
|
||||
const maxOut = int64(32 * 1024 * 1024) // 32MB for now.
|
||||
out := int64(0)
|
||||
const maxOutBytes = int64(2 * 1024 * 1024) // 2MB for now.
|
||||
const maxOutMsgs = int32(16384)
|
||||
outb := int64(0)
|
||||
outm := int32(0)
|
||||
|
||||
// Flow control processing.
|
||||
ackReplySize := func(subj string) int64 {
|
||||
@@ -3874,7 +3882,8 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
|
||||
ackReply := syncAckSubject()
|
||||
ackSub, _ := s.sysSubscribe(ackReply, func(sub *subscription, c *client, subject, reply string, msg []byte) {
|
||||
sz := ackReplySize(subject)
|
||||
atomic.AddInt64(&out, -sz)
|
||||
atomic.AddInt64(&outb, -sz)
|
||||
atomic.AddInt32(&outm, -1)
|
||||
select {
|
||||
case nextBatchC <- struct{}{}:
|
||||
default:
|
||||
@@ -3894,7 +3903,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
|
||||
seq, last := sreq.FirstSeq, sreq.LastSeq
|
||||
|
||||
sendNextBatch := func() {
|
||||
for ; seq <= last && atomic.LoadInt64(&out) <= maxOut; seq++ {
|
||||
for ; seq <= last && atomic.LoadInt64(&outb) <= maxOutBytes && atomic.LoadInt32(&outm) <= maxOutMsgs; seq++ {
|
||||
subj, hdr, msg, ts, err := mset.store.LoadMsg(seq)
|
||||
// if this is not a deleted msg, bail out.
|
||||
if err != nil && err != ErrStoreMsgNotFound && err != errDeletedMsg {
|
||||
@@ -3906,7 +3915,8 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
|
||||
em := encodeStreamMsg(subj, _EMPTY_, hdr, msg, seq, ts)
|
||||
// Place size in reply subject for flow control.
|
||||
reply := fmt.Sprintf(ackReplyT, len(em))
|
||||
atomic.AddInt64(&out, int64(len(em)))
|
||||
atomic.AddInt64(&outb, int64(len(em)))
|
||||
atomic.AddInt32(&outm, 1)
|
||||
s.sendInternalMsgLocked(sendSubject, reply, nil, em)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,12 @@ import (
|
||||
)
|
||||
|
||||
func (s *Server) publishAdvisory(acc *Account, subject string, adv interface{}) {
|
||||
if acc == nil {
|
||||
acc = s.SystemAccount()
|
||||
if acc == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
ej, err := json.Marshal(adv)
|
||||
if err == nil {
|
||||
err = s.sendInternalAccountMsg(acc, subject, ej)
|
||||
|
||||
@@ -42,6 +42,7 @@ type RaftNode interface {
|
||||
Compact(index uint64) error
|
||||
State() RaftState
|
||||
Size() (entries, bytes uint64)
|
||||
Progress() (index, commit, applied uint64)
|
||||
Leader() bool
|
||||
Quorum() bool
|
||||
Current() bool
|
||||
@@ -168,7 +169,6 @@ type raft struct {
|
||||
quit chan struct{}
|
||||
reqs chan *voteRequest
|
||||
votes chan *voteResponse
|
||||
resp chan *appendEntryResponse
|
||||
leadc chan bool
|
||||
stepdown chan string
|
||||
}
|
||||
@@ -329,7 +329,6 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
|
||||
quit: make(chan struct{}),
|
||||
reqs: make(chan *voteRequest, 8),
|
||||
votes: make(chan *voteResponse, 32),
|
||||
resp: make(chan *appendEntryResponse, 256),
|
||||
propc: make(chan *Entry, 256),
|
||||
applyc: make(chan *CommittedEntry, 512),
|
||||
leadc: make(chan bool, 8),
|
||||
@@ -680,14 +679,20 @@ func (n *raft) InstallSnapshot(data []byte) error {
|
||||
return errNodeClosed
|
||||
}
|
||||
|
||||
ae, err := n.loadEntry(n.applied)
|
||||
if err != nil {
|
||||
if state := n.wal.State(); state.LastSeq == n.applied {
|
||||
n.Unlock()
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
var term uint64
|
||||
if ae, err := n.loadEntry(n.applied); err != nil && ae != nil {
|
||||
term = ae.term
|
||||
} else {
|
||||
term = n.term
|
||||
}
|
||||
|
||||
snap := &snapshot{
|
||||
lastTerm: ae.term,
|
||||
lastTerm: term,
|
||||
lastIndex: n.applied,
|
||||
peerstate: encodePeerState(&peerState{n.peerNames(), n.csz}),
|
||||
data: data,
|
||||
@@ -704,7 +709,7 @@ func (n *raft) InstallSnapshot(data []byte) error {
|
||||
|
||||
// Remember our latest snapshot file.
|
||||
n.snapfile = sfile
|
||||
_, err = n.wal.Compact(snap.lastIndex)
|
||||
_, err := n.wal.Compact(snap.lastIndex)
|
||||
n.Unlock()
|
||||
|
||||
psnaps, _ := ioutil.ReadDir(snapDir)
|
||||
@@ -963,13 +968,20 @@ func (n *raft) campaign() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// State return the current state for this node.
|
||||
// State returns the current state for this node.
|
||||
func (n *raft) State() RaftState {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
return n.state
|
||||
}
|
||||
|
||||
// Progress returns the current index, commit and applied values.
|
||||
func (n *raft) Progress() (index, commit, applied uint64) {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
return n.pindex + 1, n.commit, n.applied
|
||||
}
|
||||
|
||||
// Size returns number of entries and total bytes for our WAL.
|
||||
func (n *raft) Size() (uint64, uint64) {
|
||||
n.RLock()
|
||||
@@ -1474,13 +1486,6 @@ func (n *raft) runAsLeader() {
|
||||
case newLeader := <-n.stepdown:
|
||||
n.switchToFollower(newLeader)
|
||||
return
|
||||
case ar := <-n.resp:
|
||||
n.trackPeer(ar.peer)
|
||||
if ar.success {
|
||||
n.trackResponse(ar)
|
||||
} else if ar.reply != _EMPTY_ {
|
||||
n.catchupFollower(ar)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1565,7 +1570,7 @@ func (n *raft) runCatchup(peer, subj string, indexUpdatesC <-chan uint64) {
|
||||
|
||||
n.debug("Running catchup for %q", peer)
|
||||
|
||||
const maxOutstanding = 48 * 1024 * 1024 // 48MB for now.
|
||||
const maxOutstanding = 2 * 1024 * 1024 // 2MB for now.
|
||||
next, total, om := uint64(0), 0, make(map[uint64]int)
|
||||
|
||||
sendNext := func() {
|
||||
@@ -2103,6 +2108,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
|
||||
n.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
if ps, err := decodePeerState(ae.entries[1].Data); err == nil {
|
||||
n.processPeerState(ps)
|
||||
// Also need to copy from client's buffer.
|
||||
@@ -2224,16 +2230,22 @@ func (n *raft) processPeerState(ps *peerState) {
|
||||
writePeerState(n.sd, ps)
|
||||
}
|
||||
|
||||
// handleAppendEntryResponse just places the decoded response on the appropriate channel.
|
||||
// handleAppendEntryResponse processes responses to append entries.
|
||||
func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, subject, reply string, msg []byte) {
|
||||
aer := n.decodeAppendEntryResponse(msg)
|
||||
if reply != _EMPTY_ {
|
||||
aer.reply = reply
|
||||
// Ignore if not the leader.
|
||||
if !n.Leader() {
|
||||
n.debug("Ignoring append entry response, no longer leader")
|
||||
return
|
||||
}
|
||||
select {
|
||||
case n.resp <- aer:
|
||||
default:
|
||||
n.error("Failed to place add entry response on chan for %q", n.group)
|
||||
ar := n.decodeAppendEntryResponse(msg)
|
||||
if reply != _EMPTY_ {
|
||||
ar.reply = reply
|
||||
}
|
||||
n.trackPeer(ar.peer)
|
||||
if ar.success {
|
||||
n.trackResponse(ar)
|
||||
} else if ar.reply != _EMPTY_ {
|
||||
n.catchupFollower(ar)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1076,7 +1076,7 @@ func (s *Server) SetDefaultSystemAccount() error {
|
||||
}
|
||||
|
||||
// For internal sends.
|
||||
const internalSendQLen = 8192
|
||||
const internalSendQLen = 256 * 1024
|
||||
|
||||
// Assign a system account. Should only be called once.
|
||||
// This sets up a server to send and receive messages from
|
||||
|
||||
@@ -331,11 +331,11 @@ func TestNoRaceLargeClusterMem(t *testing.T) {
|
||||
checkClusterFormed(t, servers...)
|
||||
|
||||
// Calculate in MB what we are using now.
|
||||
const max = 50 * 1024 * 1024 // 50MB
|
||||
const max = 60 * 1024 * 1024 // 60MB
|
||||
runtime.ReadMemStats(&m)
|
||||
used := m.TotalAlloc - pta
|
||||
if used > max {
|
||||
t.Fatalf("Cluster using too much memory, expect < 50MB, got %dMB", used/(1024*1024))
|
||||
t.Fatalf("Cluster using too much memory, expect < 60MB, got %dMB", used/(1024*1024))
|
||||
}
|
||||
|
||||
for _, s := range servers {
|
||||
|
||||
Reference in New Issue
Block a user