From 081140ee67a9422b58dc586bb46666a4b9744bfd Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 3 Aug 2023 10:41:10 -0700 Subject: [PATCH] When taking over make sure to sync and reset clfs for clustered streams. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 17 ++- server/jetstream_cluster_3_test.go | 200 +++++++++++++++++++++++++++++ server/stream.go | 8 ++ 3 files changed, 218 insertions(+), 7 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 9be5bdf1..6af4fa42 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2291,9 +2291,13 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps case isLeader = <-lch: if isLeader { - if sendSnapshot && mset != nil && n != nil { - n.SendSnapshot(mset.stateSnapshot()) - sendSnapshot = false + if mset != nil && n != nil { + // Send a snapshot if being asked or if we are tracking + // a failed state so that followers sync. + if clfs := mset.clearCLFS(); clfs > 0 || sendSnapshot { + n.SendSnapshot(mset.stateSnapshot()) + sendSnapshot = false + } } if isRestore { acc, _ := s.LookupAccount(sa.Client.serviceAccount()) @@ -2714,15 +2718,14 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco // Grab last sequence and CLFS. last, clfs := mset.lastSeqAndCLFS() - // We can skip if we know this is less than what we already have. if lseq-clfs < last { s.Debugf("Apply stream entries for '%s > %s' skipping message with sequence %d with last of %d", mset.account(), mset.name(), lseq+1-clfs, last) - // Check for any preAcks in case we are interest based. + mset.mu.Lock() - seq := lseq + 1 - mset.clfs - mset.clearAllPreAcks(seq) + // Check for any preAcks in case we are interest based. + mset.clearAllPreAcks(lseq + 1 - mset.clfs) mset.mu.Unlock() continue } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 159cf269..fbf46524 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -4785,3 +4785,203 @@ func TestJetStreamAccountUsageDrifts(t *testing.T) { checkAccount(sir1.State.Bytes, sir3.State.Bytes) } } + +func TestJetStreamClusterStreamFailTracking(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + m := nats.NewMsg("foo") + m.Data = []byte("OK") + + b, bsz := 0, 5 + sendBatch := func() { + for i := b * bsz; i < b*bsz+bsz; i++ { + msgId := fmt.Sprintf("ID:%d", i) + m.Header.Set(JSMsgId, msgId) + // Send it twice on purpose. + js.PublishMsg(m) + js.PublishMsg(m) + } + b++ + } + + sendBatch() + + _, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second) + require_NoError(t, err) + c.waitOnStreamLeader(globalAccountName, "TEST") + + sendBatch() + + // Now stop one and restart. + nl := c.randomNonStreamLeader(globalAccountName, "TEST") + mset, err := nl.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + // Reset raft + mset.resetClusteredState(nil) + time.Sleep(100 * time.Millisecond) + + nl.Shutdown() + nl.WaitForShutdown() + + sendBatch() + + nl = c.restartServer(nl) + + sendBatch() + + for { + _, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second) + require_NoError(t, err) + c.waitOnStreamLeader(globalAccountName, "TEST") + if nl == c.streamLeader(globalAccountName, "TEST") { + break + } + } + + sendBatch() + + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 1, + }) + require_NoError(t, err) + + // Make sure all in order. + errCh := make(chan error, 100) + var wg sync.WaitGroup + wg.Add(1) + + expected, seen := b*bsz, 0 + + sub, err := js.Subscribe("foo", func(msg *nats.Msg) { + expectedID := fmt.Sprintf("ID:%d", seen) + if v := msg.Header.Get(JSMsgId); v != expectedID { + errCh <- err + wg.Done() + msg.Sub.Unsubscribe() + return + } + seen++ + if seen >= expected { + wg.Done() + msg.Sub.Unsubscribe() + } + }) + require_NoError(t, err) + defer sub.Unsubscribe() + + wg.Wait() + if len(errCh) > 0 { + t.Fatalf("Expected no errors, got %d", len(errCh)) + } +} + +func TestJetStreamClusterStreamFailTrackingSnapshots(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + m := nats.NewMsg("foo") + m.Data = []byte("OK") + + // Send 1000 a dupe every msgID. + for i := 0; i < 1000; i++ { + msgId := fmt.Sprintf("ID:%d", i) + m.Header.Set(JSMsgId, msgId) + // Send it twice on purpose. + js.PublishMsg(m) + js.PublishMsg(m) + } + + // Now stop one. + nl := c.randomNonStreamLeader(globalAccountName, "TEST") + nl.Shutdown() + nl.WaitForShutdown() + + // Now send more and make sure leader snapshots. + for i := 1000; i < 2000; i++ { + msgId := fmt.Sprintf("ID:%d", i) + m.Header.Set(JSMsgId, msgId) + // Send it twice on purpose. + js.PublishMsg(m) + js.PublishMsg(m) + } + + sl := c.streamLeader(globalAccountName, "TEST") + mset, err := sl.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + node := mset.raftNode() + require_NotNil(t, node) + node.InstallSnapshot(mset.stateSnapshot()) + + // Now restart nl + nl = c.restartServer(nl) + c.waitOnServerCurrent(nl) + + // Move leader to NL + for { + _, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second) + require_NoError(t, err) + c.waitOnStreamLeader(globalAccountName, "TEST") + if nl == c.streamLeader(globalAccountName, "TEST") { + break + } + } + + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 1, + }) + require_NoError(t, err) + + // Make sure all in order. + errCh := make(chan error, 100) + var wg sync.WaitGroup + wg.Add(1) + + expected, seen := 2000, 0 + + sub, err := js.Subscribe("foo", func(msg *nats.Msg) { + expectedID := fmt.Sprintf("ID:%d", seen) + if v := msg.Header.Get(JSMsgId); v != expectedID { + errCh <- err + wg.Done() + msg.Sub.Unsubscribe() + return + } + seen++ + if seen >= expected { + wg.Done() + msg.Sub.Unsubscribe() + } + }) + require_NoError(t, err) + defer sub.Unsubscribe() + + wg.Wait() + if len(errCh) > 0 { + t.Fatalf("Expected no errors, got %d", len(errCh)) + } +} diff --git a/server/stream.go b/server/stream.go index c9280648..6a9f6206 100644 --- a/server/stream.go +++ b/server/stream.go @@ -838,6 +838,14 @@ func (mset *stream) lastSeqAndCLFS() (uint64, uint64) { return mset.lseq, mset.clfs } +func (mset *stream) clearCLFS() uint64 { + mset.mu.Lock() + defer mset.mu.Unlock() + clfs := mset.clfs + mset.clfs = 0 + return clfs +} + func (mset *stream) lastSeq() uint64 { mset.mu.RLock() lseq := mset.lseq