More fixes for some flapping tests

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-08-14 13:35:04 -07:00
parent 14572b080b
commit b6c0412d2a
2 changed files with 22 additions and 20 deletions

View File

@@ -7900,22 +7900,31 @@ func TestJetStreamClusterConsumerLastActiveReporting(t *testing.T) {
t.Fatalf("Expected last to be nil by default, got %+v", ci)
}
checkTimeDiff := func(t1, t2 *time.Time) {
t.Helper()
// Compare on a seconds level
rt1, rt2 := t1.UTC().Round(time.Second), t2.UTC().Round(time.Second)
if rt1 != rt2 {
d := rt1.Sub(rt2)
if d > time.Second || d < -time.Second {
t.Fatalf("Times differ too much, expected %v got %v", rt1, rt2)
}
}
}
checkDelivered := func(name string) {
t.Helper()
now := time.Now().UTC().Round(time.Second)
now := time.Now()
ci := consumerInfo(name)
if ci.Delivered.Last == nil {
t.Fatalf("Expected delivered last to not be nil after activity, got %+v", ci.Delivered)
}
// Compare on a seconds level
if ldt := ci.Delivered.Last.Round(time.Second); now != ldt {
t.Fatalf("Last active time is off, expected %v got %v", now, ldt)
}
checkTimeDiff(&now, ci.Delivered.Last)
}
checkLastAck := func(name string, m *nats.Msg) {
t.Helper()
now := time.Now().UTC().Round(time.Second)
now := time.Now()
if err := m.AckSync(); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -7924,9 +7933,7 @@ func TestJetStreamClusterConsumerLastActiveReporting(t *testing.T) {
t.Fatalf("Expected ack floor last to not be nil after ack, got %+v", ci.AckFloor)
}
// Compare on a seconds level
if lat := ci.AckFloor.Last.Round(time.Second); now != lat {
t.Fatalf("Last ack time is off, expected %v got %v", now, lat)
}
checkTimeDiff(&now, ci.AckFloor.Last)
}
checkAck := func(name string) {
@@ -7972,14 +7979,9 @@ func TestJetStreamClusterConsumerLastActiveReporting(t *testing.T) {
if nci.AckFloor.Last == nil {
t.Fatalf("Expected ack floor last to not be nil, got %+v", nci.AckFloor)
}
ldt, nldt := ci.Delivered.Last.Round(time.Second), nci.Delivered.Last.Round(time.Second)
if nldt != ldt {
t.Fatalf("Expected delivery times after leader transfer to be the same, %v vs %v", ldt, nldt)
}
lat, nlat := ci.AckFloor.Last.Round(time.Second), nci.AckFloor.Last.Round(time.Second)
if nlat != lat {
t.Fatalf("Expected ack floor times after leader transfer to be the same, %v vs %v", lat, nlat)
}
checkTimeDiff(ci.Delivered.Last, nci.Delivered.Last)
checkTimeDiff(ci.AckFloor.Last, nci.AckFloor.Last)
}
// Support functions
@@ -8742,7 +8744,7 @@ func (c *cluster) waitOnConsumerLeader(account, stream, consumer string) {
expires := time.Now().Add(20 * time.Second)
for time.Now().Before(expires) {
if leader := c.consumerLeader(account, stream, consumer); leader != nil {
time.Sleep(100 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
return
}
time.Sleep(100 * time.Millisecond)
@@ -8765,7 +8767,7 @@ func (c *cluster) waitOnStreamLeader(account, stream string) {
expires := time.Now().Add(30 * time.Second)
for time.Now().Before(expires) {
if leader := c.streamLeader(account, stream); leader != nil {
time.Sleep(100 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
return
}
time.Sleep(100 * time.Millisecond)

View File

@@ -5944,7 +5944,7 @@ func TestJetStreamInterestRetentionStream(t *testing.T) {
// Wait for all messsages to be pending for each sub.
for i, sub := range []*nats.Subscription{sub1, sub2, sub3} {
checkFor(t, 500*time.Millisecond, 25*time.Millisecond, func() error {
checkFor(t, 5*time.Second, 25*time.Millisecond, func() error {
if nmsgs, _, _ := sub.Pending(); nmsgs != totalMsgs {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d for sub %d", nmsgs, totalMsgs, i+1)
}