mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-15 10:40:41 -07:00
Merge pull request #1918 from nats-io/bump
Some stability improvements to raft lib and catchup stream processing.
This commit is contained in:
@@ -40,7 +40,7 @@ var (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.2.0-beta.80"
|
||||
VERSION = "2.2.0-beta.82"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
@@ -3647,7 +3647,6 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) {
|
||||
|
||||
mset.mu.Lock()
|
||||
state := mset.store.State()
|
||||
|
||||
sreq := mset.calculateSyncRequest(&state, snap)
|
||||
s, subject, n := mset.srv, mset.sa.Sync, mset.node
|
||||
mset.mu.Unlock()
|
||||
@@ -3736,9 +3735,9 @@ RETRY:
|
||||
case mrec := <-msgsC:
|
||||
notActive.Reset(activityInterval)
|
||||
msg := mrec.msg
|
||||
// Check eof signaling.
|
||||
// Check for eof signaling.
|
||||
if len(msg) == 0 {
|
||||
goto RETRY
|
||||
return
|
||||
}
|
||||
if lseq, err := mset.processCatchupMsg(msg); err == nil {
|
||||
if lseq >= last {
|
||||
|
||||
@@ -369,6 +369,11 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
|
||||
|
||||
for index := state.FirstSeq; index <= state.LastSeq; index++ {
|
||||
ae, err := n.loadEntry(index)
|
||||
if ae.pindex != index-1 {
|
||||
n.warn("Corrupt WAL, truncating")
|
||||
n.wal.Truncate(index - 1)
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
panic("err loading entry from WAL")
|
||||
}
|
||||
@@ -1967,6 +1972,7 @@ func (n *raft) createCatchup(ae *appendEntry) string {
|
||||
inbox := n.newInbox()
|
||||
sub, _ := n.subscribe(inbox, n.handleAppendEntry)
|
||||
n.catchup.sub = sub
|
||||
|
||||
return inbox
|
||||
}
|
||||
|
||||
@@ -1982,6 +1988,7 @@ func (n *raft) attemptStepDown(newLeader string) {
|
||||
// processAppendEntry will process an appendEntry.
|
||||
func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
|
||||
n.Lock()
|
||||
|
||||
// Just return if closed.
|
||||
if n.state == Closed {
|
||||
n.Unlock()
|
||||
@@ -2004,7 +2011,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
|
||||
// Catching up state.
|
||||
catchingUp := n.catchup != nil
|
||||
// Is this a new entry?
|
||||
isNew := sub == n.aesub
|
||||
isNew := sub != nil && sub == n.aesub
|
||||
|
||||
// Track leader directly
|
||||
if isNew && ae.leader != noLeader {
|
||||
@@ -2024,7 +2031,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
|
||||
|
||||
// If we are catching up ignore old catchup subs.
|
||||
// This could happen when we stall or cancel a catchup.
|
||||
if !isNew && sub != n.catchup.sub {
|
||||
if !isNew && n.catchup != nil && sub != n.catchup.sub {
|
||||
n.Unlock()
|
||||
n.debug("AppendEntry ignoring old entry from previous catchup")
|
||||
return
|
||||
@@ -2082,7 +2089,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
|
||||
if eae, err := n.loadEntry(ae.pindex); err == nil && eae != nil {
|
||||
// If terms mismatched, delete that entry and all others past it.
|
||||
if eae.pterm != ae.pterm {
|
||||
n.wal.Truncate(ae.pindex)
|
||||
n.wal.Truncate(ae.pindex - 1)
|
||||
n.pindex = ae.pindex
|
||||
n.pterm = ae.pterm
|
||||
ar = &appendEntryResponse{n.pterm, n.pindex, n.id, false, _EMPTY_}
|
||||
@@ -2099,7 +2106,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
|
||||
|
||||
// Check if we are catching up. If we are here we know the leader did not have all of the entries
|
||||
// so make sure this is a snapshot entry. If it is not start the catchup process again since it
|
||||
// means we missed additional messages.
|
||||
// means we may have missed additional messages.
|
||||
if catchingUp {
|
||||
// Snapshots and peerstate will always be together when a leader is catching us up.
|
||||
if len(ae.entries) != 2 || ae.entries[0].Type != EntrySnapshot || ae.entries[1].Type != EntryPeerState {
|
||||
@@ -2268,6 +2275,15 @@ func (n *raft) storeToWAL(ae *appendEntry) error {
|
||||
fmt.Printf("[%s] n is %+v\n\n", n.s, n)
|
||||
fmt.Printf("[%s] n.catchup is %+v\n", n.s, n.catchup)
|
||||
fmt.Printf("[%s] n.wal is %+v\n", n.s, n.wal.State())
|
||||
if state := n.wal.State(); state.Msgs > 0 {
|
||||
for index := state.FirstSeq; index <= state.LastSeq; index++ {
|
||||
nae, _ := n.loadEntry(index)
|
||||
fmt.Printf("INDEX %d is %+v\n", index, nae)
|
||||
for _, e := range nae.entries {
|
||||
fmt.Printf("Entry type is %v\n", e.Type)
|
||||
}
|
||||
}
|
||||
}
|
||||
panic(fmt.Sprintf("[%s-%s] Placed an entry at the wrong index, ae is %+v, seq is %d, n.pindex is %d\n\n", n.s, n.group, ae, seq, n.pindex))
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user