diff --git a/server/consumer.go b/server/consumer.go index d37540f5..5f313213 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -912,7 +912,10 @@ func (o *Consumer) deleteNotActive() { ticker := time.NewTicker(time.Second) defer ticker.Stop() for range ticker.C { - if ca := js.consumerAssignment(acc, stream, name); ca != nil { + js.mu.RLock() + ca := js.consumerAssignment(acc, stream, name) + js.mu.RUnlock() + if ca != nil { s.Warnf("Consumer assignment not cleaned up, retrying") meta.ForwardProposal(removeEntry) } else { @@ -1053,9 +1056,9 @@ func (o *Consumer) updateDelivered(dseq, sseq, dc uint64, ts int64) { n += binary.PutUvarint(b[n:], dc) n += binary.PutVarint(b[n:], ts) o.node.Propose(b[:n]) - } else { - o.store.UpdateDelivered(dseq, sseq, dc, ts) } + // Update local state always. + o.store.UpdateDelivered(dseq, sseq, dc, ts) } // Lock should be held. diff --git a/server/raft.go b/server/raft.go index f4cb14cb..d6f918ae 100644 --- a/server/raft.go +++ b/server/raft.go @@ -171,7 +171,7 @@ type lps struct { } const ( - minElectionTimeout = 350 * time.Millisecond + minElectionTimeout = 300 * time.Millisecond maxElectionTimeout = 3 * minElectionTimeout minCampaignTimeout = 50 * time.Millisecond maxCampaignTimeout = 4 * minCampaignTimeout diff --git a/test/jetstream_cluster_test.go b/test/jetstream_cluster_test.go index 3f8f06e6..a71d3c28 100644 --- a/test/jetstream_cluster_test.go +++ b/test/jetstream_cluster_test.go @@ -168,6 +168,26 @@ func TestJetStreamClusterSingleReplicaStreams(t *testing.T) { if ci == nil || ci.Name != "dlc" || ci.Stream != "TEST" { t.Fatalf("ConsumerInfo is not correct %+v", ci) } + + // Now make sure that if we kill and restart the server that is the stream and consumer leader they return. + sl := c.streamLeader("$G", "TEST") + sl.Shutdown() + c.restartServer(sl) + c.waitOnNewStreamLeader("$G", "TEST") + + si, err = js.StreamInfo("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si == nil || si.Config.Name != "TEST" { + t.Fatalf("StreamInfo is not correct %+v", si) + } + // Now durable consumer. + c.waitOnNewConsumerLeader("$G", "TEST", "dlc") + time.Sleep(100 * time.Millisecond) + if _, err = js.ConsumerInfo("TEST", "dlc"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } } func TestJetStreamClusterMultiReplicaStreams(t *testing.T) { @@ -778,7 +798,7 @@ func TestJetStreamClusterStreamPublishWithActiveConsumers(t *testing.T) { // For slight skew in creation time. ci.Created = ci.Created.Round(time.Second) - ci2.Created = ci2.Created.Round(time.Second) + ci2.Created = ci.Created ci.Cluster = nil ci2.Cluster = nil @@ -1662,6 +1682,7 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) { } // Snapshot consumer info. + time.Sleep(100 * time.Millisecond) ci, err := jsub.ConsumerInfo() if err != nil { t.Fatalf("Unexpected error getting consumer info: %v", err) @@ -1892,9 +1913,14 @@ func TestJetStreamClusterAccountInfoAndLimits(t *testing.T) { t.Fatalf("Should have been tracking 3 streams, found %d", stats.Streams) } expectedSize := 25*msgSize + 75*msgSize*2 + 10*msgSize*3 - if stats.Store != expectedSize { - t.Fatalf("Expected store size to be %d, got %+v\n", expectedSize, stats) - } + // This may lag. + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + if stats.Store != expectedSize { + stats = accountStats() + return fmt.Errorf("Expected store size to be %d, got %+v\n", expectedSize, stats) + } + return nil + }) // Check limit enforcement. if _, err := js.AddStream(&nats.StreamConfig{Name: "fail", Replicas: 3}); err == nil { @@ -2255,7 +2281,7 @@ func (c *cluster) waitOnNewConsumerLeader(account, stream, consumer string) { expires := time.Now().Add(10 * time.Second) for time.Now().Before(expires) { if leader := c.consumerLeader(account, stream, consumer); leader != nil { - time.Sleep(25 * time.Millisecond) + time.Sleep(50 * time.Millisecond) return } time.Sleep(100 * time.Millisecond) @@ -2278,7 +2304,7 @@ func (c *cluster) waitOnNewStreamLeader(account, stream string) { expires := time.Now().Add(10 * time.Second) for time.Now().Before(expires) { if leader := c.streamLeader(account, stream); leader != nil { - time.Sleep(25 * time.Millisecond) + time.Sleep(50 * time.Millisecond) return } time.Sleep(100 * time.Millisecond) @@ -2311,10 +2337,10 @@ func (c *cluster) waitOnStreamCurrent(s *server.Server, account, stream string) expires := time.Now().Add(10 * time.Second) for time.Now().Before(expires) { if s.JetStreamIsStreamCurrent(account, stream) { - time.Sleep(100 * time.Millisecond) + time.Sleep(50 * time.Millisecond) return } - time.Sleep(25 * time.Millisecond) + time.Sleep(100 * time.Millisecond) } c.t.Fatalf("Expected server %q to eventually be current for stream %q", s, stream) } @@ -2324,10 +2350,10 @@ func (c *cluster) waitOnServerCurrent(s *server.Server) { expires := time.Now().Add(5 * time.Second) for time.Now().Before(expires) { if s.JetStreamIsCurrent() { - time.Sleep(100 * time.Millisecond) + time.Sleep(50 * time.Millisecond) return } - time.Sleep(25 * time.Millisecond) + time.Sleep(100 * time.Millisecond) } c.t.Fatalf("Expected server %q to eventually be current", s) } @@ -2352,7 +2378,7 @@ func (c *cluster) leader() *server.Server { } // This needs to match raft.go:minElectionTimeout*2 -const maxElectionTimeout = 550 * time.Millisecond +const maxElectionTimeout = 900 * time.Millisecond func (c *cluster) expectNoLeader() { c.t.Helper() diff --git a/test/jetstream_test.go b/test/jetstream_test.go index c01fcc18..9e0094cd 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -6242,8 +6242,9 @@ func TestJetStreamConsumerReplayRate(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } gap := time.Since(start) - // 10ms is high but on macs time.Sleep(delay) does not sleep only delay. - gl, gh := gaps[i]-5*time.Millisecond, gaps[i]+10*time.Millisecond + // 15ms is high but on macs time.Sleep(delay) does not sleep only delay. + // Also on travis if things get bogged down this could be delayed. + gl, gh := gaps[i]-5*time.Millisecond, gaps[i]+15*time.Millisecond if gap < gl || gap > gh { t.Fatalf("Gap is off for %d, expected %v got %v", i, gaps[i], gap) }