Merge pull request #2191 from nats-io/raft_catchup_snap

[FIXED] Raft groups could continually spin trying to catch up.
This commit is contained in:
Derek Collison
2021-05-07 14:20:37 -07:00
committed by GitHub
5 changed files with 87 additions and 55 deletions

View File

@@ -1111,9 +1111,9 @@ func (o *consumer) processAck(_ *subscription, c *client, subject, reply string,
switch {
case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK):
o.ackMsg(sseq, dseq, dc)
o.processAckMsg(sseq, dseq, dc, true)
case bytes.HasPrefix(msg, AckNext):
o.ackMsg(sseq, dseq, dc)
o.processAckMsg(sseq, dseq, dc, true)
// processNextMsgReq can be invoked from an internal subscription or from here.
// Therefore, it has to call msgParts(), so we can't simply pass msg[len(AckNext):]
// with current c.pa.hdr because it would cause a panic. We will save the current
@@ -1503,11 +1503,6 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) {
o.sendAdvisory(o.ackEventT, j)
}
// Process an ack for a message.
func (o *consumer) ackMsg(sseq, dseq, dc uint64) {
o.processAckMsg(sseq, dseq, dc, true)
}
func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {
o.mu.Lock()
var sagap uint64
@@ -1608,8 +1603,10 @@ func (o *consumer) needAck(sseq uint64) bool {
}
state, err := o.store.State()
if err != nil || state == nil {
// Fall back to what we track internally for now.
needsAck := sseq > o.asflr
o.mu.RUnlock()
return false
return needsAck
}
asflr, osseq = state.AckFloor.Stream, o.sseq
pending = state.Pending

View File

@@ -2382,6 +2382,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
ourID = cc.meta.ID()
}
var isMember bool
if ca.Group != nil && ourID != _EMPTY_ {
isMember = ca.Group.isMember(ourID)
}
@@ -4037,9 +4038,11 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
// We need to set the ephemeral here before replicating.
var oname string
if !isDurableConsumer(cfg) {
// We chose to have ephemerals be R=1.
rg.Peers = []string{rg.Preferred}
rg.Name = groupNameForConsumer(rg.Peers, rg.Storage)
// We chose to have ephemerals be R=1 unless stream is interest or workqueue.
if sa.Config.Retention == LimitsPolicy {
rg.Peers = []string{rg.Preferred}
rg.Name = groupNameForConsumer(rg.Peers, rg.Storage)
}
// Make sure name is unique.
for {
oname = createConsumerName()

View File

@@ -2049,22 +2049,26 @@ func TestJetStreamClusterMirrorAndSourceWorkQueues(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Allow sync consumers to connect.
time.Sleep(500 * time.Millisecond)
if _, err = js.Publish("foo", []byte("ok")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
time.Sleep(250 * time.Millisecond)
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
if si, _ := js.StreamInfo("WQ22"); si.State.Msgs != 0 {
return fmt.Errorf("Expected no msgs for %q, got %d", "WQ22", si.State.Msgs)
}
if si, _ := js.StreamInfo("M"); si.State.Msgs != 1 {
return fmt.Errorf("Expected 1 msg for %q, got %d", "M", si.State.Msgs)
}
if si, _ := js.StreamInfo("S"); si.State.Msgs != 1 {
return fmt.Errorf("Expected 1 msg for %q, got %d", "S", si.State.Msgs)
}
return nil
})
if si, _ := js.StreamInfo("WQ22"); si.State.Msgs != 0 {
t.Fatalf("Expected no msgs, got %d", si.State.Msgs)
}
if si, _ := js.StreamInfo("M"); si.State.Msgs != 1 {
t.Fatalf("Expected 1 msg, got %d", si.State.Msgs)
}
if si, _ := js.StreamInfo("S"); si.State.Msgs != 1 {
t.Fatalf("Expected 1 msg, got %d", si.State.Msgs)
}
}
func TestJetStreamClusterMirrorAndSourceInterestPolicyStream(t *testing.T) {
@@ -2102,23 +2106,26 @@ func TestJetStreamClusterMirrorAndSourceInterestPolicyStream(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Allow sync consumers to connect.
time.Sleep(500 * time.Millisecond)
if _, err = js.Publish("foo", []byte("ok")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
time.Sleep(250 * time.Millisecond)
// This one will be 0 since no other interest exists.
if si, _ := js.StreamInfo("WQ22"); si.State.Msgs != 0 {
t.Fatalf("Expected no msgs, got %d", si.State.Msgs)
}
if si, _ := js.StreamInfo("M"); si.State.Msgs != 1 {
t.Fatalf("Expected 1 msg, got %d", si.State.Msgs)
}
if si, _ := js.StreamInfo("S"); si.State.Msgs != 1 {
t.Fatalf("Expected 1 msg, got %d", si.State.Msgs)
}
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
// This one will be 0 since no other interest exists.
if si, _ := js.StreamInfo("WQ22"); si.State.Msgs != 0 {
return fmt.Errorf("Expected no msgs for %q, got %d", "WQ22", si.State.Msgs)
}
if si, _ := js.StreamInfo("M"); si.State.Msgs != 1 {
return fmt.Errorf("Expected 1 msg for %q, got %d", "M", si.State.Msgs)
}
if si, _ := js.StreamInfo("S"); si.State.Msgs != 1 {
return fmt.Errorf("Expected 1 msg for %q, got %d", "S", si.State.Msgs)
}
return nil
})
// Now create other interest on WQ22.
sub, err := js.SubscribeSync("foo")
@@ -2126,23 +2133,26 @@ func TestJetStreamClusterMirrorAndSourceInterestPolicyStream(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
// Allow consumer state to propagate.
time.Sleep(500 * time.Millisecond)
if _, err = js.Publish("foo", []byte("ok")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
time.Sleep(250 * time.Millisecond)
// This one should be 1 now since we will hold for the other subscriber.
if si, _ := js.StreamInfo("WQ22"); si.State.Msgs != 1 {
t.Fatalf("Expected 1 msg, got %d", si.State.Msgs)
}
if si, _ := js.StreamInfo("M"); si.State.Msgs != 2 {
t.Fatalf("Expected 2 msgs, got %d", si.State.Msgs)
}
if si, _ := js.StreamInfo("S"); si.State.Msgs != 2 {
t.Fatalf("Expected 2 msgs, got %d", si.State.Msgs)
}
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
// This one will be 0 since no other interest exists.
if si, _ := js.StreamInfo("WQ22"); si.State.Msgs != 1 {
return fmt.Errorf("Expected 1 msg for %q, got %d", "WQ22", si.State.Msgs)
}
if si, _ := js.StreamInfo("M"); si.State.Msgs != 2 {
return fmt.Errorf("Expected 2 msgs for %q, got %d", "M", si.State.Msgs)
}
if si, _ := js.StreamInfo("S"); si.State.Msgs != 2 {
return fmt.Errorf("Expected 2 msgs for %q, got %d", "S", si.State.Msgs)
}
return nil
})
}
func TestJetStreamClusterInterestRetentionWithFilteredConsumers(t *testing.T) {

View File

@@ -893,7 +893,7 @@ func (n *raft) InstallSnapshot(data []byte) error {
return err
}
if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil {
if _, err := n.wal.Compact(snap.lastIndex); err != nil {
n.Unlock()
n.setWriteErr(err)
return err
@@ -989,6 +989,7 @@ func (n *raft) setupLastSnapshot() {
n.pindex = snap.lastIndex
n.pterm = snap.lastTerm
n.commit = snap.lastIndex
n.applied = snap.lastIndex
n.applyc <- &CommittedEntry{n.commit, []*Entry{{EntrySnapshot, snap.data}}}
if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil {
n.setWriteErrLocked(err)
@@ -1936,8 +1937,8 @@ func (n *raft) sendSnapshotToFollower(subject string) (uint64, error) {
ae.pterm, ae.pindex = snap.lastTerm, snap.lastIndex
var state StreamState
n.wal.FastState(&state)
if snap.lastIndex+1 != state.FirstSeq && state.FirstSeq != 0 {
snap.lastIndex = state.FirstSeq - 1
if snap.lastIndex < state.FirstSeq && state.FirstSeq != 0 {
snap.lastIndex = state.FirstSeq
ae.pindex = snap.lastIndex
}
n.sendRPC(subject, n.areply, ae.encode())
@@ -1965,14 +1966,19 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
var state StreamState
n.wal.FastState(&state)
if start < state.FirstSeq {
if start < state.FirstSeq || state.Msgs == 0 && start <= state.LastSeq {
n.debug("Need to send snapshot to follower")
if lastIndex, err := n.sendSnapshotToFollower(ar.reply); err != nil {
n.error("Error sending snapshot to follower [%s]: %v", ar.peer, err)
n.attemptStepDown(noLeader)
n.Unlock()
return
} else {
// If no other entries can just return here.
if state.Msgs == 0 {
n.debug("Finished catching up")
n.Unlock()
return
}
n.debug("Snapshot sent, reset first entry to %d", lastIndex)
start = lastIndex
}

View File

@@ -160,6 +160,9 @@ type stream struct {
// Sources
sources map[string]*sourceInfo
// Indicates we have direct consumers.
directs int
// For flowcontrol processing for source and mirror internal consumers.
fcr map[uint64]string
@@ -3004,12 +3007,18 @@ func (mset *stream) setConsumer(o *consumer) {
if o.cfg.FilterSubject != _EMPTY_ {
mset.numFilter++
}
if o.cfg.Direct {
mset.directs++
}
}
func (mset *stream) removeConsumer(o *consumer) {
if o.cfg.FilterSubject != _EMPTY_ {
mset.numFilter--
}
if o.cfg.Direct {
mset.directs--
}
delete(mset.consumers, o.name)
}
@@ -3080,16 +3089,23 @@ func (mset *stream) checkInterest(seq uint64, obs *consumer) bool {
}
// ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy.
func (mset *stream) ackMsg(obs *consumer, seq uint64) {
func (mset *stream) ackMsg(o *consumer, seq uint64) {
switch mset.cfg.Retention {
case LimitsPolicy:
return
case WorkQueuePolicy:
mset.store.RemoveMsg(seq)
// Normally we just remove a message when its ack'd here but if we have direct consumers
// from sources and/or mirrors we need to make sure they have delivered the msg.
mset.mu.RLock()
shouldRemove := mset.directs <= 0 || !mset.checkInterest(seq, o)
mset.mu.RUnlock()
if shouldRemove {
mset.store.RemoveMsg(seq)
}
case InterestPolicy:
mset.mu.Lock()
hasInterest := mset.checkInterest(seq, obs)
mset.mu.Unlock()
mset.mu.RLock()
hasInterest := mset.checkInterest(seq, o)
mset.mu.RUnlock()
if !hasInterest {
mset.store.RemoveMsg(seq)
}