mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fixed data race and some flappers
Data race that has been seen:
```
Read at 0x00c00134bec0 by goroutine 159:
github.com/nats-io/nats-server/v2/server.(*client).msgHeaderForRouteOrLeaf()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/client.go:2935 +0x254
github.com/nats-io/nats-server/v2/server.(*client).processMsgResults()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/client.go:4364 +0x2147
(...)
Previous write at 0x00c00134bec0 by goroutine 201:
github.com/nats-io/nats-server/v2/server.(*Server).addRoute()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/route.go:1475 +0xdb4
github.com/nats-io/nats-server/v2/server.(*client).processRouteInfo()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/route.go:641 +0x1704
```
Also fixed some flappers and removed use of `s.js.` since we have
already captured `js` in Jsz monitoring.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -29,8 +29,8 @@ before_script:
|
||||
script:
|
||||
- set -e
|
||||
- if [[ $TRAVIS_TAG ]]; then go test -v -run=TestVersionMatchesTag ./server; fi
|
||||
- if [[ ! $TRAVIS_TAG ]]; then go test -v -run=TestNoRace --failfast -p=1 -timeout 20m ./...; fi
|
||||
- if [[ ! $TRAVIS_TAG ]]; then if [[ "$TRAVIS_GO_VERSION" =~ 1.16 ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race -p=1 --failfast -timeout 20m ./...; fi; fi
|
||||
- if [[ ! $TRAVIS_TAG ]]; then go test -v -run=TestNoRace --failfast -p=1 -timeout 30m ./...; fi
|
||||
- if [[ ! $TRAVIS_TAG ]]; then if [[ "$TRAVIS_GO_VERSION" =~ 1.16 ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race -p=1 --failfast -timeout 30m ./...; fi; fi
|
||||
- set +e
|
||||
after_success:
|
||||
- if [[ ! $TRAVIS_TAG ]]; then if [[ "$TRAVIS_GO_VERSION" =~ 1.16 ]]; then $HOME/gopath/bin/goveralls -coverprofile=acc.out -service travis-ci; fi; fi
|
||||
|
||||
@@ -2922,21 +2922,26 @@ func (c *client) checkDenySub(subject string) bool {
|
||||
// Create a message header for routes or leafnodes. Header and origin cluster aware.
|
||||
func (c *client) msgHeaderForRouteOrLeaf(subj, reply []byte, rt *routeTarget, acc *Account) []byte {
|
||||
hasHeader := c.pa.hdr > 0
|
||||
canReceiveHeader := rt.sub.client.headers
|
||||
subclient := rt.sub.client
|
||||
canReceiveHeader := subclient.headers
|
||||
|
||||
mh := c.msgb[:msgHeadProtoLen]
|
||||
kind := rt.sub.client.kind
|
||||
kind := subclient.kind
|
||||
var lnoc bool
|
||||
|
||||
if kind == ROUTER {
|
||||
// If we are coming from a leaf with an origin cluster we need to handle differently
|
||||
// if we can. We will send a route based LMSG which has origin cluster and headers
|
||||
// by default.
|
||||
if c.kind == LEAF && c.remoteCluster() != _EMPTY_ && rt.sub.client.route.lnoc {
|
||||
if c.kind == LEAF && c.remoteCluster() != _EMPTY_ {
|
||||
subclient.mu.Lock()
|
||||
lnoc = subclient.route.lnoc
|
||||
subclient.mu.Unlock()
|
||||
}
|
||||
if lnoc {
|
||||
mh[0] = 'L'
|
||||
mh = append(mh, c.remoteCluster()...)
|
||||
mh = append(mh, ' ')
|
||||
lnoc = true
|
||||
} else {
|
||||
// Router (and Gateway) nodes are RMSG. Set here since leafnodes may rewrite.
|
||||
mh[0] = 'R'
|
||||
|
||||
@@ -1190,26 +1190,33 @@ func TestFileStoreBitRot(t *testing.T) {
|
||||
t.Fatalf("Expected to have no corrupt msgs, got %d", len(ld.Msgs))
|
||||
}
|
||||
|
||||
// Now twiddle some bits.
|
||||
fs.mu.Lock()
|
||||
lmb := fs.lmb
|
||||
contents, _ := ioutil.ReadFile(lmb.mfn)
|
||||
var index int
|
||||
for {
|
||||
index = rand.Intn(len(contents))
|
||||
// Reverse one byte anywhere.
|
||||
b := contents[index]
|
||||
contents[index] = bits.Reverse8(b)
|
||||
if b != contents[index] {
|
||||
for i := 0; i < 10; i++ {
|
||||
// Now twiddle some bits.
|
||||
fs.mu.Lock()
|
||||
lmb := fs.lmb
|
||||
contents, _ := ioutil.ReadFile(lmb.mfn)
|
||||
var index int
|
||||
for {
|
||||
index = rand.Intn(len(contents))
|
||||
// Reverse one byte anywhere.
|
||||
b := contents[index]
|
||||
contents[index] = bits.Reverse8(b)
|
||||
if b != contents[index] {
|
||||
break
|
||||
}
|
||||
}
|
||||
ioutil.WriteFile(lmb.mfn, contents, 0644)
|
||||
fs.mu.Unlock()
|
||||
|
||||
ld := fs.checkMsgs()
|
||||
if len(ld.Msgs) > 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
ioutil.WriteFile(lmb.mfn, contents, 0644)
|
||||
fs.mu.Unlock()
|
||||
|
||||
ld := fs.checkMsgs()
|
||||
if ld == nil || len(ld.Msgs) == 0 {
|
||||
t.Fatalf("Expected to have corrupt msgs got none: changed [%d]", index)
|
||||
// Fail the test if we have tried the 10 times and still did not
|
||||
// get any corruption report.
|
||||
if i == 9 {
|
||||
t.Fatalf("Expected to have corrupt msgs got none: changed [%d]", index)
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure we can restore.
|
||||
|
||||
@@ -10068,7 +10068,7 @@ func TestJetStreamClusterConsumerPendingBug(t *testing.T) {
|
||||
}
|
||||
}
|
||||
// Wait for them to all be there.
|
||||
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
|
||||
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
|
||||
si, err := js.StreamInfo("foo")
|
||||
require_NoError(t, err)
|
||||
if si.State.Msgs != uint64(n) {
|
||||
@@ -10082,14 +10082,17 @@ func TestJetStreamClusterConsumerPendingBug(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating consumer: %v", err)
|
||||
}
|
||||
ci, err := js.ConsumerInfo("foo", "dlc")
|
||||
require_NoError(t, err)
|
||||
if ci.NumPending != uint64(n) {
|
||||
t.Fatalf("Expected NumPending to be %d, got %d", n, ci.NumPending)
|
||||
}
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("Timed out?")
|
||||
}
|
||||
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
|
||||
ci, err := js.ConsumerInfo("foo", "dlc")
|
||||
require_NoError(t, err)
|
||||
if ci.NumPending != uint64(n) {
|
||||
return fmt.Errorf("Expected NumPending to be %d, got %d", n, ci.NumPending)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestJetStreamClusterPullPerf(t *testing.T) {
|
||||
|
||||
@@ -2635,14 +2635,14 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
|
||||
|
||||
var accounts []*jsAccount
|
||||
|
||||
s.js.mu.RLock()
|
||||
jsi.Config = s.js.config
|
||||
for _, info := range s.js.accounts {
|
||||
js.mu.RLock()
|
||||
jsi.Config = js.config
|
||||
for _, info := range js.accounts {
|
||||
accounts = append(accounts, info)
|
||||
}
|
||||
s.js.mu.RUnlock()
|
||||
js.mu.RUnlock()
|
||||
|
||||
if mg := s.js.getMetaGroup(); mg != nil {
|
||||
if mg := js.getMetaGroup(); mg != nil {
|
||||
if ci := s.raftNodeToClusterInfo(mg); ci != nil {
|
||||
jsi.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Size: mg.ClusterSize()}
|
||||
if isLeader {
|
||||
@@ -2651,7 +2651,7 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
|
||||
}
|
||||
}
|
||||
|
||||
jsi.JetStreamStats = *s.js.usageStats()
|
||||
jsi.JetStreamStats = *js.usageStats()
|
||||
|
||||
filterIdx := -1
|
||||
for i, jsa := range accounts {
|
||||
|
||||
Reference in New Issue
Block a user