Merge pull request #1922 from nats-io/s2

various bug fixes, wal/snapshot stability
This commit is contained in:
Derek Collison
2021-02-18 09:52:17 -07:00
committed by GitHub
3 changed files with 30 additions and 20 deletions

View File

@@ -2439,8 +2439,8 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, subject,
}
// Default chunk size for now.
const defaultSnapshotChunkSize = 128 * 1024
const defaultSnapshotWindowSize = 16 * 1024 * 1024 // 16MB
const defaultSnapshotChunkSize = 256 * 1024
const defaultSnapshotWindowSize = 32 * 1024 * 1024 // 32MB
// streamSnapshot will stream out our snapshot to the reply subject.
func (s *Server) streamSnapshot(ci *ClientInfo, acc *Account, mset *stream, sr *SnapshotResult, req *JSApiStreamSnapshotRequest) {
@@ -2477,13 +2477,15 @@ func (s *Server) streamSnapshot(ci *ClientInfo, acc *Account, mset *stream, sr *
// We will place sequence number and size of chunk sent in the reply.
ackSubj := fmt.Sprintf(jsSnapshotAckT, mset.name(), nuid.Next())
ackSub, _ := mset.subscribeInternalUnlocked(ackSubj+".>", func(_ *subscription, _ *client, subject, _ string, _ []byte) {
cs, _ := strconv.Atoi(tokenAt(subject, 6))
// This is very crude and simple, but ok for now.
// This only matters when sending multiple chunks.
if atomic.LoadInt32(&out) > defaultSnapshotWindowSize {
acks <- struct{}{}
if atomic.AddInt32(&out, int32(-cs)) < defaultSnapshotWindowSize {
select {
case acks <- struct{}{}:
default:
}
}
cs, _ := strconv.Atoi(tokenAt(subject, 6))
atomic.AddInt32(&out, int32(-cs))
})
defer mset.unsubscribeUnlocked(ackSub)
@@ -2507,7 +2509,7 @@ func (s *Server) streamSnapshot(ci *ClientInfo, acc *Account, mset *stream, sr *
case <-acks:
case <-inch: // Lost interest
goto done
case <-time.After(time.Millisecond):
case <-time.After(10 * time.Millisecond):
}
}
// TODO(dlc) - Might want these moved off sendq if we have contention.

View File

@@ -1158,8 +1158,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {
const (
compactInterval = 2 * time.Minute
compactSizeMin = 4 * 1024 * 1024
compactNumMin = 32
compactSizeMin = 64 * 1024 * 1024
compactNumMin = 8
)
t := time.NewTicker(compactInterval)
@@ -1187,6 +1187,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {
if snap := mset.stateSnapshot(); !bytes.Equal(lastSnap, snap) {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap = snap
_, _, lastApplied = n.Progress()
}
}
}
@@ -1212,10 +1213,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {
if err := js.applyStreamEntries(mset, ce, isRecovering); err == nil {
n.Applied(ce.Index)
ne := ce.Index - lastApplied
lastApplied = ce.Index
// If over our compact min and we have at least min entries to compact, go ahead and snapshot/compact.
if _, b := n.Size(); lastSnap == nil || (b > compactSizeMin && ne > compactNumMin) {
if _, b := n.Size(); lastSnap == nil || (b > compactSizeMin && ne >= compactNumMin) {
doSnapshot()
}
} else {
@@ -2271,7 +2270,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
const (
compactInterval = 2 * time.Minute
compactSizeMin = 8 * 1024 * 1024
compactNumMin = 256
compactNumMin = 64
)
t := time.NewTicker(compactInterval)
@@ -2286,6 +2285,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
if snap := encodeConsumerState(state); !bytes.Equal(lastSnap, snap) {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap = snap
_, _, lastApplied = n.Progress()
}
}
}
@@ -2305,8 +2305,6 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
if err := js.applyConsumerEntries(o, ce); err == nil {
n.Applied(ce.Index)
ne := ce.Index - lastApplied
lastApplied = ce.Index
// If over our compact min and we have at least min entries to compact, go ahead and snapshot/compact.
if _, b := n.Size(); lastSnap == nil || (b > compactSizeMin && ne > compactNumMin) {
doSnapshot()
@@ -3680,6 +3678,16 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) {
if sub != nil {
s.sysUnsubscribe(sub)
}
// Make sure any consumers are updated for the pending amounts.
mset.mu.Lock()
for _, o := range mset.consumers {
o.mu.Lock()
if o.isLeader() {
o.setInitialPending()
}
o.mu.Unlock()
}
mset.mu.Unlock()
}()
RETRY:

View File

@@ -355,7 +355,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
// TODO(dlc) - Recover our state here.
if first, err := n.loadFirstEntry(); err == nil {
n.pterm, n.pindex = first.pterm, first.pindex
if first.commit > 0 {
if first.commit > 0 && first.commit > n.commit {
n.commit = first.commit
}
}
@@ -684,7 +684,7 @@ func (n *raft) InstallSnapshot(data []byte) error {
return errNodeClosed
}
if state := n.wal.State(); state.LastSeq == n.applied {
if state := n.wal.State(); state.FirstSeq == n.applied {
n.Unlock()
return nil
}
@@ -1588,7 +1588,7 @@ func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesC <-chan uint64)
ae, err := n.loadEntry(next)
if err != nil {
if err != ErrStoreEOF {
n.debug("Got an error loading %d index: %v", next, err)
n.warn("Got an error loading %d index: %v", next, err)
}
return true
}
@@ -1735,7 +1735,7 @@ func (n *raft) applyCommit(index uint64) error {
ae, err := n.loadEntry(index)
if err != nil {
if err != ErrStoreClosed {
n.debug("Got an error loading %d index: %v", index, err)
n.warn("Got an error loading %d index: %v", index, err)
}
n.commit = original
return errEntryLoadFailed
@@ -2166,7 +2166,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.Unlock()
return
}
n.debug("Error storing to WAL: %v", err)
n.warn("Error storing to WAL: %v", err)
//FIXME(dlc)!!, WARN AT LEAST, RESPOND FALSE, return etc!