From 34650e9dd535ff13efe5f19bab1620ac83803936 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 31 Mar 2022 10:05:34 -0600 Subject: [PATCH] Fixed data race and some flappers Data race that has been seen: ``` Read at 0x00c00134bec0 by goroutine 159: github.com/nats-io/nats-server/v2/server.(*client).msgHeaderForRouteOrLeaf() /home/travis/gopath/src/github.com/nats-io/nats-server/server/client.go:2935 +0x254 github.com/nats-io/nats-server/v2/server.(*client).processMsgResults() /home/travis/gopath/src/github.com/nats-io/nats-server/server/client.go:4364 +0x2147 (...) Previous write at 0x00c00134bec0 by goroutine 201: github.com/nats-io/nats-server/v2/server.(*Server).addRoute() /home/travis/gopath/src/github.com/nats-io/nats-server/server/route.go:1475 +0xdb4 github.com/nats-io/nats-server/v2/server.(*client).processRouteInfo() /home/travis/gopath/src/github.com/nats-io/nats-server/server/route.go:641 +0x1704 ``` Also fixed some flappers and removed use of `s.js.` since we have already captured `js` in Jsz monitoring. Signed-off-by: Ivan Kozlovic --- .travis.yml | 4 +-- server/client.go | 13 +++++++--- server/filestore_test.go | 43 +++++++++++++++++++------------- server/jetstream_cluster_test.go | 15 ++++++----- server/monitor.go | 12 ++++----- 5 files changed, 51 insertions(+), 36 deletions(-) diff --git a/.travis.yml b/.travis.yml index c323fc86..8860dca9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -29,8 +29,8 @@ before_script: script: - set -e - if [[ $TRAVIS_TAG ]]; then go test -v -run=TestVersionMatchesTag ./server; fi -- if [[ ! $TRAVIS_TAG ]]; then go test -v -run=TestNoRace --failfast -p=1 -timeout 20m ./...; fi -- if [[ ! $TRAVIS_TAG ]]; then if [[ "$TRAVIS_GO_VERSION" =~ 1.16 ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race -p=1 --failfast -timeout 20m ./...; fi; fi +- if [[ ! $TRAVIS_TAG ]]; then go test -v -run=TestNoRace --failfast -p=1 -timeout 30m ./...; fi +- if [[ ! $TRAVIS_TAG ]]; then if [[ "$TRAVIS_GO_VERSION" =~ 1.16 ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race -p=1 --failfast -timeout 30m ./...; fi; fi - set +e after_success: - if [[ ! $TRAVIS_TAG ]]; then if [[ "$TRAVIS_GO_VERSION" =~ 1.16 ]]; then $HOME/gopath/bin/goveralls -coverprofile=acc.out -service travis-ci; fi; fi diff --git a/server/client.go b/server/client.go index 1c9ab73a..4e563909 100644 --- a/server/client.go +++ b/server/client.go @@ -2922,21 +2922,26 @@ func (c *client) checkDenySub(subject string) bool { // Create a message header for routes or leafnodes. Header and origin cluster aware. func (c *client) msgHeaderForRouteOrLeaf(subj, reply []byte, rt *routeTarget, acc *Account) []byte { hasHeader := c.pa.hdr > 0 - canReceiveHeader := rt.sub.client.headers + subclient := rt.sub.client + canReceiveHeader := subclient.headers mh := c.msgb[:msgHeadProtoLen] - kind := rt.sub.client.kind + kind := subclient.kind var lnoc bool if kind == ROUTER { // If we are coming from a leaf with an origin cluster we need to handle differently // if we can. We will send a route based LMSG which has origin cluster and headers // by default. - if c.kind == LEAF && c.remoteCluster() != _EMPTY_ && rt.sub.client.route.lnoc { + if c.kind == LEAF && c.remoteCluster() != _EMPTY_ { + subclient.mu.Lock() + lnoc = subclient.route.lnoc + subclient.mu.Unlock() + } + if lnoc { mh[0] = 'L' mh = append(mh, c.remoteCluster()...) mh = append(mh, ' ') - lnoc = true } else { // Router (and Gateway) nodes are RMSG. Set here since leafnodes may rewrite. mh[0] = 'R' diff --git a/server/filestore_test.go b/server/filestore_test.go index 604f1592..dd0d464b 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -1190,26 +1190,33 @@ func TestFileStoreBitRot(t *testing.T) { t.Fatalf("Expected to have no corrupt msgs, got %d", len(ld.Msgs)) } - // Now twiddle some bits. - fs.mu.Lock() - lmb := fs.lmb - contents, _ := ioutil.ReadFile(lmb.mfn) - var index int - for { - index = rand.Intn(len(contents)) - // Reverse one byte anywhere. - b := contents[index] - contents[index] = bits.Reverse8(b) - if b != contents[index] { + for i := 0; i < 10; i++ { + // Now twiddle some bits. + fs.mu.Lock() + lmb := fs.lmb + contents, _ := ioutil.ReadFile(lmb.mfn) + var index int + for { + index = rand.Intn(len(contents)) + // Reverse one byte anywhere. + b := contents[index] + contents[index] = bits.Reverse8(b) + if b != contents[index] { + break + } + } + ioutil.WriteFile(lmb.mfn, contents, 0644) + fs.mu.Unlock() + + ld := fs.checkMsgs() + if len(ld.Msgs) > 0 { break } - } - ioutil.WriteFile(lmb.mfn, contents, 0644) - fs.mu.Unlock() - - ld := fs.checkMsgs() - if ld == nil || len(ld.Msgs) == 0 { - t.Fatalf("Expected to have corrupt msgs got none: changed [%d]", index) + // Fail the test if we have tried the 10 times and still did not + // get any corruption report. + if i == 9 { + t.Fatalf("Expected to have corrupt msgs got none: changed [%d]", index) + } } // Make sure we can restore. diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 2795583a..534e05a1 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -10068,7 +10068,7 @@ func TestJetStreamClusterConsumerPendingBug(t *testing.T) { } } // Wait for them to all be there. - checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { si, err := js.StreamInfo("foo") require_NoError(t, err) if si.State.Msgs != uint64(n) { @@ -10082,14 +10082,17 @@ func TestJetStreamClusterConsumerPendingBug(t *testing.T) { if err != nil { t.Fatalf("Error creating consumer: %v", err) } - ci, err := js.ConsumerInfo("foo", "dlc") - require_NoError(t, err) - if ci.NumPending != uint64(n) { - t.Fatalf("Expected NumPending to be %d, got %d", n, ci.NumPending) - } case <-time.After(5 * time.Second): t.Fatalf("Timed out?") } + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + ci, err := js.ConsumerInfo("foo", "dlc") + require_NoError(t, err) + if ci.NumPending != uint64(n) { + return fmt.Errorf("Expected NumPending to be %d, got %d", n, ci.NumPending) + } + return nil + }) } func TestJetStreamClusterPullPerf(t *testing.T) { diff --git a/server/monitor.go b/server/monitor.go index 222ea81f..7f1f7940 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2635,14 +2635,14 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) { var accounts []*jsAccount - s.js.mu.RLock() - jsi.Config = s.js.config - for _, info := range s.js.accounts { + js.mu.RLock() + jsi.Config = js.config + for _, info := range js.accounts { accounts = append(accounts, info) } - s.js.mu.RUnlock() + js.mu.RUnlock() - if mg := s.js.getMetaGroup(); mg != nil { + if mg := js.getMetaGroup(); mg != nil { if ci := s.raftNodeToClusterInfo(mg); ci != nil { jsi.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Size: mg.ClusterSize()} if isLeader { @@ -2651,7 +2651,7 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) { } } - jsi.JetStreamStats = *s.js.usageStats() + jsi.JetStreamStats = *js.usageStats() filterIdx := -1 for i, jsa := range accounts {