From 6df4403913c90836e8b106627aefc2559a1c5282 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Thu, 24 Aug 2023 16:30:16 +0200 Subject: [PATCH 1/3] Fix flaky TestJetStreamClusterConsumerFollowerStoreStateAckFloorBug Signed-off-by: Tomasz Pietrek --- server/jetstream_cluster_3_test.go | 63 ++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 21 deletions(-) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index c7ac26fe..8cf77adf 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3155,40 +3155,57 @@ func TestJetStreamClusterConsumerFollowerStoreStateAckFloorBug(t *testing.T) { rand.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] }) for _, m := range msgs { - m.AckSync() + if err := m.AckSync(); err != nil { + t.Fatalf("Ack failed :%+v", err) + } } - checkConsumerState := func(delivered, ackFloor nats.SequenceInfo, numAckPending int) { + checkConsumerState := func(delivered, ackFloor nats.SequenceInfo, numAckPending int) error { expectedDelivered := uint64(num) + 1 if delivered.Stream != expectedDelivered || delivered.Consumer != expectedDelivered { - t.Fatalf("Wrong delivered, expected %d got %+v", expectedDelivered, delivered) + return fmt.Errorf("Wrong delivered, expected %d got %+v", expectedDelivered, delivered) } expectedAck := uint64(num) if ackFloor.Stream != expectedAck || ackFloor.Consumer != expectedAck { - t.Fatalf("Wrong ackFloor, expected %d got %+v", expectedAck, ackFloor) + return fmt.Errorf("Wrong ackFloor, expected %d got %+v", expectedAck, ackFloor) } - require_True(t, numAckPending == 1) + if numAckPending != 1 { + return errors.New("Expected num ack pending to be 1") + } + return nil } ci, err := js.ConsumerInfo("TEST", "C") require_NoError(t, err) - checkConsumerState(ci.Delivered, ci.AckFloor, ci.NumAckPending) + require_NoError(t, checkConsumerState(ci.Delivered, ci.AckFloor, ci.NumAckPending)) // Check each consumer on each server for it's store state and make sure it matches as well. - for _, s := range c.servers { - mset, err := s.GlobalAccount().lookupStream("TEST") - require_NoError(t, err) - require_NotNil(t, mset) - o := mset.lookupConsumer("C") - require_NotNil(t, o) + checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { + for _, s := range c.servers { + mset, err := s.GlobalAccount().lookupStream("TEST") + if err != nil { + return err + } + if mset == nil { + return errors.New("Mset should not be nil") + } + o := mset.lookupConsumer("C") + if o == nil { + return errors.New("Consumer should not be nil") + } - state, err := o.store.State() - require_NoError(t, err) - - delivered := nats.SequenceInfo{Stream: state.Delivered.Stream, Consumer: state.Delivered.Consumer} - ackFloor := nats.SequenceInfo{Stream: state.AckFloor.Stream, Consumer: state.AckFloor.Consumer} - checkConsumerState(delivered, ackFloor, len(state.Pending)) - } + state, err := o.store.State() + if err != nil { + return err + } + delivered := nats.SequenceInfo{Stream: state.Delivered.Stream, Consumer: state.Delivered.Consumer} + ackFloor := nats.SequenceInfo{Stream: state.AckFloor.Stream, Consumer: state.AckFloor.Consumer} + if err := checkConsumerState(delivered, ackFloor, len(state.Pending)); err != nil { + return err + } + } + return nil + }) // Now stepdown the consumer and move its leader and check the state after transition. // Make the restarted server the eventual leader. @@ -3216,8 +3233,12 @@ func TestJetStreamClusterConsumerFollowerStoreStateAckFloorBug(t *testing.T) { cl := c.consumerLeader(globalAccountName, "TEST", "C") seen[cl] = true ci, err := js.ConsumerInfo("TEST", "C") - require_NoError(t, err) - checkConsumerState(ci.Delivered, ci.AckFloor, ci.NumAckPending) + if err != nil { + return err + } + if err := checkConsumerState(ci.Delivered, ci.AckFloor, ci.NumAckPending); err != nil { + return err + } cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "C") return fmt.Errorf("Not all servers have been consumer leader yet") }) From 22ed97c6c9d7bd8f4969172bc3b5a1514f642139 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 25 Aug 2023 08:59:47 -0700 Subject: [PATCH 2/3] Fix for purge with keep bug and improved search for large number of blocks. Signed-off-by: Derek Collison --- server/filestore.go | 43 +++++++++++++++++----------------------- server/filestore_test.go | 20 +++++++++++++++++++ 2 files changed, 38 insertions(+), 25 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 2857abec..e30e8a77 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -4008,28 +4008,30 @@ func (fs *fileStore) selectMsgBlockWithIndex(seq uint64) (int, *msgBlock) { return -1, nil } - // Starting index, defaults to beginning. - si := 0 + const linearThresh = 32 + nb := len(fs.blks) - 1 - // TODO(dlc) - Use new AVL and make this real for anything beyond certain size. - // Max threshold before we probe for a starting block to start our linear search. - const maxl = 256 - if nb := len(fs.blks); nb > maxl { - d := nb / 8 - for _, i := range []int{d, 2 * d, 3 * d, 4 * d, 5 * d, 6 * d, 7 * d} { - mb := fs.blks[i] + if nb < linearThresh { + for i, mb := range fs.blks { if seq <= atomic.LoadUint64(&mb.last.seq) { - break + return i, mb } - si = i } + return -1, nil } - // blks are sorted in ascending order. - for i := si; i < len(fs.blks); i++ { - mb := fs.blks[i] - if seq <= atomic.LoadUint64(&mb.last.seq) { - return i, mb + // Do traditional binary search here since we know the blocks are sorted by sequence first and last. + for low, high, mid := 0, nb, nb/2; low <= high; mid = (low + high) / 2 { + mb := fs.blks[mid] + // Right now these atomic loads do not factor in, so fine to leave. Was considering + // uplifting these to fs scope to avoid atomic load but not needed. + first, last := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq) + if seq > last { + low = mid + 1 + } else if seq < first { + high = mid - 1 + } else { + return mid, mb } } @@ -5205,16 +5207,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint } if sequence > 1 { return fs.Compact(sequence) - } else if keep > 0 { - fs.mu.RLock() - msgs, lseq := fs.state.Msgs, fs.state.LastSeq - fs.mu.RUnlock() - if keep >= msgs { - return 0, nil - } - return fs.Compact(lseq - keep + 1) } - return 0, nil } eq, wc := compareFn(subject), subjectHasWildcard(subject) diff --git a/server/filestore_test.go b/server/filestore_test.go index 6c902f2a..81032303 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -5569,3 +5569,23 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) { // Make sure it was update properly. require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false}) } + +func TestFileStoreKeepWithDeletedMsgsBug(t *testing.T) { + fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}) + require_NoError(t, err) + + msg := bytes.Repeat([]byte("A"), 19) + for i := 0; i < 5; i++ { + fs.StoreMsg("A", nil, msg) + fs.StoreMsg("B", nil, msg) + } + + n, err := fs.PurgeEx("A", 0, 0) + require_NoError(t, err) + require_True(t, n == 5) + + // Purge with keep. + n, err = fs.PurgeEx(_EMPTY_, 0, 2) + require_NoError(t, err) + require_True(t, n == 3) +} From e5625b9d9b6870518e84a0a1ec2bc0b718b5bd59 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 25 Aug 2023 10:15:01 -0700 Subject: [PATCH 3/3] If a leader is asked for an item and we have no items left, make sure to also step-down. Signed-off-by: Derek Collison --- server/raft.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/raft.go b/server/raft.go index daf5493a..bee20a1f 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2509,6 +2509,10 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { } if err != nil || ae == nil { n.warn("Could not find a starting entry for catchup request: %v", err) + // If we are here we are seeing a request for an item we do not have, meaning we should stepdown. + // This is possible on a reset of our WAL but the other side has a snapshot already. + // If we do not stepdown this can cycle. + n.stepdown.push(noLeader) n.Unlock() arPool.Put(ar) return