Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2023-08-25 11:03:54 -07:00
4 changed files with 84 additions and 46 deletions

View File

@@ -3155,40 +3155,57 @@ func TestJetStreamClusterConsumerFollowerStoreStateAckFloorBug(t *testing.T) {
rand.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] })
for _, m := range msgs {
m.AckSync()
if err := m.AckSync(); err != nil {
t.Fatalf("Ack failed :%+v", err)
}
}
checkConsumerState := func(delivered, ackFloor nats.SequenceInfo, numAckPending int) {
checkConsumerState := func(delivered, ackFloor nats.SequenceInfo, numAckPending int) error {
expectedDelivered := uint64(num) + 1
if delivered.Stream != expectedDelivered || delivered.Consumer != expectedDelivered {
t.Fatalf("Wrong delivered, expected %d got %+v", expectedDelivered, delivered)
return fmt.Errorf("Wrong delivered, expected %d got %+v", expectedDelivered, delivered)
}
expectedAck := uint64(num)
if ackFloor.Stream != expectedAck || ackFloor.Consumer != expectedAck {
t.Fatalf("Wrong ackFloor, expected %d got %+v", expectedAck, ackFloor)
return fmt.Errorf("Wrong ackFloor, expected %d got %+v", expectedAck, ackFloor)
}
require_True(t, numAckPending == 1)
if numAckPending != 1 {
return errors.New("Expected num ack pending to be 1")
}
return nil
}
ci, err := js.ConsumerInfo("TEST", "C")
require_NoError(t, err)
checkConsumerState(ci.Delivered, ci.AckFloor, ci.NumAckPending)
require_NoError(t, checkConsumerState(ci.Delivered, ci.AckFloor, ci.NumAckPending))
// Check each consumer on each server for it's store state and make sure it matches as well.
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
require_NotNil(t, mset)
o := mset.lookupConsumer("C")
require_NotNil(t, o)
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("TEST")
if err != nil {
return err
}
if mset == nil {
return errors.New("Mset should not be nil")
}
o := mset.lookupConsumer("C")
if o == nil {
return errors.New("Consumer should not be nil")
}
state, err := o.store.State()
require_NoError(t, err)
delivered := nats.SequenceInfo{Stream: state.Delivered.Stream, Consumer: state.Delivered.Consumer}
ackFloor := nats.SequenceInfo{Stream: state.AckFloor.Stream, Consumer: state.AckFloor.Consumer}
checkConsumerState(delivered, ackFloor, len(state.Pending))
}
state, err := o.store.State()
if err != nil {
return err
}
delivered := nats.SequenceInfo{Stream: state.Delivered.Stream, Consumer: state.Delivered.Consumer}
ackFloor := nats.SequenceInfo{Stream: state.AckFloor.Stream, Consumer: state.AckFloor.Consumer}
if err := checkConsumerState(delivered, ackFloor, len(state.Pending)); err != nil {
return err
}
}
return nil
})
// Now stepdown the consumer and move its leader and check the state after transition.
// Make the restarted server the eventual leader.
@@ -3216,8 +3233,12 @@ func TestJetStreamClusterConsumerFollowerStoreStateAckFloorBug(t *testing.T) {
cl := c.consumerLeader(globalAccountName, "TEST", "C")
seen[cl] = true
ci, err := js.ConsumerInfo("TEST", "C")
require_NoError(t, err)
checkConsumerState(ci.Delivered, ci.AckFloor, ci.NumAckPending)
if err != nil {
return err
}
if err := checkConsumerState(ci.Delivered, ci.AckFloor, ci.NumAckPending); err != nil {
return err
}
cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "C")
return fmt.Errorf("Not all servers have been consumer leader yet")
})