Fix for race and test for issue R.I. was seeing in nightly. Also fixed flappers.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-01-24 20:20:05 -08:00
parent a72ddedb55
commit 117607ef11
4 changed files with 47 additions and 17 deletions

View File

@@ -912,7 +912,10 @@ func (o *Consumer) deleteNotActive() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for range ticker.C {
if ca := js.consumerAssignment(acc, stream, name); ca != nil {
js.mu.RLock()
ca := js.consumerAssignment(acc, stream, name)
js.mu.RUnlock()
if ca != nil {
s.Warnf("Consumer assignment not cleaned up, retrying")
meta.ForwardProposal(removeEntry)
} else {
@@ -1053,9 +1056,9 @@ func (o *Consumer) updateDelivered(dseq, sseq, dc uint64, ts int64) {
n += binary.PutUvarint(b[n:], dc)
n += binary.PutVarint(b[n:], ts)
o.node.Propose(b[:n])
} else {
o.store.UpdateDelivered(dseq, sseq, dc, ts)
}
// Update local state always.
o.store.UpdateDelivered(dseq, sseq, dc, ts)
}
// Lock should be held.

View File

@@ -171,7 +171,7 @@ type lps struct {
}
const (
minElectionTimeout = 350 * time.Millisecond
minElectionTimeout = 300 * time.Millisecond
maxElectionTimeout = 3 * minElectionTimeout
minCampaignTimeout = 50 * time.Millisecond
maxCampaignTimeout = 4 * minCampaignTimeout

View File

@@ -168,6 +168,26 @@ func TestJetStreamClusterSingleReplicaStreams(t *testing.T) {
if ci == nil || ci.Name != "dlc" || ci.Stream != "TEST" {
t.Fatalf("ConsumerInfo is not correct %+v", ci)
}
// Now make sure that if we kill and restart the server that is the stream and consumer leader they return.
sl := c.streamLeader("$G", "TEST")
sl.Shutdown()
c.restartServer(sl)
c.waitOnNewStreamLeader("$G", "TEST")
si, err = js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si == nil || si.Config.Name != "TEST" {
t.Fatalf("StreamInfo is not correct %+v", si)
}
// Now durable consumer.
c.waitOnNewConsumerLeader("$G", "TEST", "dlc")
time.Sleep(100 * time.Millisecond)
if _, err = js.ConsumerInfo("TEST", "dlc"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
func TestJetStreamClusterMultiReplicaStreams(t *testing.T) {
@@ -778,7 +798,7 @@ func TestJetStreamClusterStreamPublishWithActiveConsumers(t *testing.T) {
// For slight skew in creation time.
ci.Created = ci.Created.Round(time.Second)
ci2.Created = ci2.Created.Round(time.Second)
ci2.Created = ci.Created
ci.Cluster = nil
ci2.Cluster = nil
@@ -1662,6 +1682,7 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) {
}
// Snapshot consumer info.
time.Sleep(100 * time.Millisecond)
ci, err := jsub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error getting consumer info: %v", err)
@@ -1892,9 +1913,14 @@ func TestJetStreamClusterAccountInfoAndLimits(t *testing.T) {
t.Fatalf("Should have been tracking 3 streams, found %d", stats.Streams)
}
expectedSize := 25*msgSize + 75*msgSize*2 + 10*msgSize*3
if stats.Store != expectedSize {
t.Fatalf("Expected store size to be %d, got %+v\n", expectedSize, stats)
}
// This may lag.
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
if stats.Store != expectedSize {
stats = accountStats()
return fmt.Errorf("Expected store size to be %d, got %+v\n", expectedSize, stats)
}
return nil
})
// Check limit enforcement.
if _, err := js.AddStream(&nats.StreamConfig{Name: "fail", Replicas: 3}); err == nil {
@@ -2255,7 +2281,7 @@ func (c *cluster) waitOnNewConsumerLeader(account, stream, consumer string) {
expires := time.Now().Add(10 * time.Second)
for time.Now().Before(expires) {
if leader := c.consumerLeader(account, stream, consumer); leader != nil {
time.Sleep(25 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
return
}
time.Sleep(100 * time.Millisecond)
@@ -2278,7 +2304,7 @@ func (c *cluster) waitOnNewStreamLeader(account, stream string) {
expires := time.Now().Add(10 * time.Second)
for time.Now().Before(expires) {
if leader := c.streamLeader(account, stream); leader != nil {
time.Sleep(25 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
return
}
time.Sleep(100 * time.Millisecond)
@@ -2311,10 +2337,10 @@ func (c *cluster) waitOnStreamCurrent(s *server.Server, account, stream string)
expires := time.Now().Add(10 * time.Second)
for time.Now().Before(expires) {
if s.JetStreamIsStreamCurrent(account, stream) {
time.Sleep(100 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
return
}
time.Sleep(25 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
}
c.t.Fatalf("Expected server %q to eventually be current for stream %q", s, stream)
}
@@ -2324,10 +2350,10 @@ func (c *cluster) waitOnServerCurrent(s *server.Server) {
expires := time.Now().Add(5 * time.Second)
for time.Now().Before(expires) {
if s.JetStreamIsCurrent() {
time.Sleep(100 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
return
}
time.Sleep(25 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
}
c.t.Fatalf("Expected server %q to eventually be current", s)
}
@@ -2352,7 +2378,7 @@ func (c *cluster) leader() *server.Server {
}
// This needs to match raft.go:minElectionTimeout*2
const maxElectionTimeout = 550 * time.Millisecond
const maxElectionTimeout = 900 * time.Millisecond
func (c *cluster) expectNoLeader() {
c.t.Helper()

View File

@@ -6242,8 +6242,9 @@ func TestJetStreamConsumerReplayRate(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
gap := time.Since(start)
// 10ms is high but on macs time.Sleep(delay) does not sleep only delay.
gl, gh := gaps[i]-5*time.Millisecond, gaps[i]+10*time.Millisecond
// 15ms is high but on macs time.Sleep(delay) does not sleep only delay.
// Also on travis if things get bogged down this could be delayed.
gl, gh := gaps[i]-5*time.Millisecond, gaps[i]+15*time.Millisecond
if gap < gl || gap > gh {
t.Fatalf("Gap is off for %d, expected %v got %v", i, gaps[i], gap)
}