diff --git a/server/norace_test.go b/server/norace_test.go index 9fd8702d..787a6e5f 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -5041,11 +5041,13 @@ func TestNoRaceJetStreamClusterInterestPullConsumerStreamLimitBug(t *testing.T) nc, js := jsClientConnect(t, c.randomServer()) defer nc.Close() + limit := uint64(1000) + _, err := js.AddStream(&nats.StreamConfig{ Name: "TEST", Subjects: []string{"foo"}, Retention: nats.InterestPolicy, - MaxMsgs: 2000, + MaxMsgs: int64(limit), Replicas: 3, }) require_NoError(t, err) @@ -5065,10 +5067,7 @@ func TestNoRaceJetStreamClusterInterestPullConsumerStreamLimitBug(t *testing.T) select { case <-pt.C: _, err := js.Publish("foo", []byte("BUG!")) - if err != nil { - t.Logf("Got a publisher error: %v", err) - return - } + require_NoError(t, err) case <-qch: pt.Stop() return @@ -5121,26 +5120,28 @@ func TestNoRaceJetStreamClusterInterestPullConsumerStreamLimitBug(t *testing.T) }() } - time.Sleep(5 * time.Second) + // Make sure we have hit the limit for the number of messages we expected. + checkFor(t, 20*time.Second, 500*time.Millisecond, func() error { + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + if si.State.Msgs < limit { + return fmt.Errorf("Not hit limit yet") + } + return nil + }) + close(qch) wg.Wait() checkFor(t, 20*time.Second, 500*time.Millisecond, func() error { si, err := js.StreamInfo("TEST") - if err != nil { - return err - } + require_NoError(t, err) ci, err := js.ConsumerInfo("TEST", "dur") - if err != nil { - return err - } + require_NoError(t, err) - ld := ci.Delivered.Stream - if si.State.FirstSeq > ld { - ld = si.State.FirstSeq - 1 - } - if si.State.LastSeq-ld != ci.NumPending { - return fmt.Errorf("Expected NumPending to be %d got %d", si.State.LastSeq-ld, ci.NumPending) + np := ci.NumPending + uint64(ci.NumAckPending) + if np != si.State.Msgs { + return fmt.Errorf("Expected NumPending to be %d got %d", si.State.Msgs-uint64(ci.NumAckPending), ci.NumPending) } return nil })