Fix for a flapper

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-09-12 17:19:30 -07:00
parent 3407eda769
commit 58b5fc4abf

View File

@@ -3769,15 +3769,11 @@ func TestNoRaceJetStreamClusterStreamReset(t *testing.T) {
defer sub.Unsubscribe()
si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != uint64(numRequests) {
t.Fatalf("Expected %d msgs, got bad state: %+v", numRequests, si.State)
}
require_NoError(t, err)
require_True(t, si.State.Msgs == uint64(numRequests))
// Let settle a bit for Go routine checks.
time.Sleep(250 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
// Grab number go routines.
base := runtime.NumGoroutine()
@@ -3799,11 +3795,9 @@ func TestNoRaceJetStreamClusterStreamReset(t *testing.T) {
// Wait til we have the consumer leader re-elected.
c.waitOnConsumerLeader("$G", "TEST", "d1")
// So we do not wait all 10s in each call to ConsumerInfo.
js2, _ := nc.JetStream(nats.MaxWait(250 * time.Millisecond))
// Make sure we can get the consumer info eventually.
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
_, err := js2.ConsumerInfo("TEST", "d1")
_, err := js.ConsumerInfo("TEST", "d1", nats.MaxWait(250*time.Millisecond))
return err
})
@@ -3815,8 +3809,13 @@ func TestNoRaceJetStreamClusterStreamReset(t *testing.T) {
})
// Simulate a low level write error on our consumer and make sure we can recover etc.
cl = c.consumerLeader("$G", "TEST", "d1")
require_True(t, cl != nil)
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
if cl = c.consumerLeader("$G", "TEST", "d1"); cl != nil {
return nil
}
return errors.New("waiting on consumer leader")
})
mset, err = cl.GlobalAccount().lookupStream("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)