mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Attempt to fix flapper again
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -5041,11 +5041,13 @@ func TestNoRaceJetStreamClusterInterestPullConsumerStreamLimitBug(t *testing.T)
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
limit := uint64(1000)
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"foo"},
|
||||
Retention: nats.InterestPolicy,
|
||||
MaxMsgs: 2000,
|
||||
MaxMsgs: int64(limit),
|
||||
Replicas: 3,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
@@ -5065,10 +5067,7 @@ func TestNoRaceJetStreamClusterInterestPullConsumerStreamLimitBug(t *testing.T)
|
||||
select {
|
||||
case <-pt.C:
|
||||
_, err := js.Publish("foo", []byte("BUG!"))
|
||||
if err != nil {
|
||||
t.Logf("Got a publisher error: %v", err)
|
||||
return
|
||||
}
|
||||
require_NoError(t, err)
|
||||
case <-qch:
|
||||
pt.Stop()
|
||||
return
|
||||
@@ -5121,26 +5120,28 @@ func TestNoRaceJetStreamClusterInterestPullConsumerStreamLimitBug(t *testing.T)
|
||||
}()
|
||||
}
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
// Make sure we have hit the limit for the number of messages we expected.
|
||||
checkFor(t, 20*time.Second, 500*time.Millisecond, func() error {
|
||||
si, err := js.StreamInfo("TEST")
|
||||
require_NoError(t, err)
|
||||
if si.State.Msgs < limit {
|
||||
return fmt.Errorf("Not hit limit yet")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
close(qch)
|
||||
wg.Wait()
|
||||
|
||||
checkFor(t, 20*time.Second, 500*time.Millisecond, func() error {
|
||||
si, err := js.StreamInfo("TEST")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
require_NoError(t, err)
|
||||
ci, err := js.ConsumerInfo("TEST", "dur")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
require_NoError(t, err)
|
||||
|
||||
ld := ci.Delivered.Stream
|
||||
if si.State.FirstSeq > ld {
|
||||
ld = si.State.FirstSeq - 1
|
||||
}
|
||||
if si.State.LastSeq-ld != ci.NumPending {
|
||||
return fmt.Errorf("Expected NumPending to be %d got %d", si.State.LastSeq-ld, ci.NumPending)
|
||||
np := ci.NumPending + uint64(ci.NumAckPending)
|
||||
if np != si.State.Msgs {
|
||||
return fmt.Errorf("Expected NumPending to be %d got %d", si.State.Msgs-uint64(ci.NumAckPending), ci.NumPending)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user