From 8fe8b835fe230e457f7ac938a2c4d93187d301a3 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 23 Feb 2021 14:08:17 -0800 Subject: [PATCH] Fixes for flapping tests Signed-off-by: Derek Collison --- .travis.yml | 6 +++--- server/jetstream_cluster.go | 5 +++-- server/jetstream_cluster_test.go | 5 ++++- server/raft.go | 4 ++++ 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/.travis.yml b/.travis.yml index 19a8c783..b9c1c3c9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,8 +3,8 @@ dist: focal language: go go: +- 1.16.x - 1.15.x -- 1.14.x addons: apt: packages: @@ -29,7 +29,7 @@ script: - set -e - go test -i ./... - go test -v -run=TestNoRace --failfast -p=1 ./... -- if [[ "$TRAVIS_GO_VERSION" =~ 1.14 ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race -p=1 --failfast ./...; fi +- if [[ "$TRAVIS_GO_VERSION" =~ 1.15 ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race -p=1 --failfast ./...; fi - set +e deploy: @@ -38,4 +38,4 @@ deploy: script: curl -sL http://git.io/goreleaser | bash on: tags: true - condition: $TRAVIS_GO_VERSION =~ 1.14 + condition: $TRAVIS_GO_VERSION =~ 1.16 diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 637893dc..12ca46af 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2041,6 +2041,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember, } js.mu.RLock() s := js.srv + hadLeader := sa.Group.node == nil || sa.Group.node.GroupLeader() != noLeader js.mu.RUnlock() acc, err := s.LookupAccount(sa.Client.serviceAccount()) @@ -2056,14 +2057,14 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember, if err != nil { resp.Error = jsNotFoundError(err) } else if mset != nil { - err = mset.stop(true, wasLeader) + err = mset.stop(true, false) } if sa.Group.node != nil { sa.Group.node.Delete() } - if !isMember || !wasLeader && sa.Group.node != nil && sa.Group.node.GroupLeader() != noLeader { + if !isMember || !wasLeader && hadLeader { return } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 9b0d2361..5494ee63 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -821,6 +821,7 @@ func TestJetStreamClusterRestoreSingleConsumer(t *testing.T) { c.stopAll() c.restartAll() + c.waitOnLeader() s = c.randomServer() nc, js = jsClientConnect(t, s) @@ -2048,6 +2049,7 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) { } } nc.Flush() + time.Sleep(200 * time.Millisecond) // Snapshot consumer info. ci, err := jsub.ConsumerInfo() @@ -2819,6 +2821,7 @@ func TestJetStreamClusterCreateResponseAdvisoriesHaveSubject(t *testing.T) { } checkSubsPending(t, sub, 6) + for m, err := sub.NextMsg(0); err == nil; m, err = sub.NextMsg(0) { var audit JSAPIAudit if err := json.Unmarshal(m.Data, &audit); err != nil { @@ -4217,7 +4220,7 @@ func jsClientConnect(t *testing.T, s *Server) (*nats.Conn, nats.JetStreamContext func checkSubsPending(t *testing.T, sub *nats.Subscription, numExpected int) { t.Helper() - checkFor(t, 3*time.Second, 10*time.Millisecond, func() error { + checkFor(t, 4*time.Second, 20*time.Millisecond, func() error { if nmsgs, _, err := sub.Pending(); err != nil || nmsgs != numExpected { return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, numExpected) } diff --git a/server/raft.go b/server/raft.go index e5d3cbb4..e351abfb 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1449,6 +1449,10 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _, reply st func (n *raft) runAsLeader() { n.RLock() + if n.state == Closed { + n.RUnlock() + return + } psubj, rpsubj := n.psubj, n.rpsubj n.RUnlock()