[FIXED] Account resolver lock inversion

There was a lock inversion but low risk since it happened during
server initialization. Still fixed it and added the ordering
in locksordering.txt file.

Also fixed multiple lock inversions that were caused by tests.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2023-09-25 15:09:11 -06:00
parent 83cc80ab74
commit a84ce61a93
5 changed files with 56 additions and 44 deletions

View File

@@ -14,3 +14,5 @@ allow that lock to be held and the acquire a client lock which is not
possible with the normal account lock. possible with the normal account lock.
accountLeafList -> client accountLeafList -> client
AccountResolver interface has various implementations, but assume: AccountResolver -> Server

View File

@@ -2734,25 +2734,6 @@ func TestJetStreamClusterFlowControlRequiresHeartbeats(t *testing.T) {
} }
} }
var jsClusterAccountLimitsTempl = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
no_auth_user: js
accounts {
$JS { users = [ { user: "js", pass: "p" } ]; jetstream: {max_store: 1MB, max_mem: 0} }
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`
func TestJetStreamClusterMixedModeColdStartPrune(t *testing.T) { func TestJetStreamClusterMixedModeColdStartPrune(t *testing.T) {
// Purposely make this unbalanced. Without changes this will never form a quorum to elect the meta-leader. // Purposely make this unbalanced. Without changes this will never form a quorum to elect the meta-leader.
c := createMixedModeCluster(t, jsMixedModeGlobalAccountTempl, "MMCS5", _EMPTY_, 3, 4, false) c := createMixedModeCluster(t, jsMixedModeGlobalAccountTempl, "MMCS5", _EMPTY_, 3, 4, false)
@@ -5552,10 +5533,8 @@ func TestJetStreamClusterConsumerOverrides(t *testing.T) {
o := mset.lookupConsumer("m") o := mset.lookupConsumer("m")
require_True(t, o != nil) require_True(t, o != nil)
o.mu.RLock()
st := o.store.Type() st := o.store.Type()
n := o.raftNode() n := o.raftNode()
o.mu.RUnlock()
require_True(t, n != nil) require_True(t, n != nil)
rn := n.(*raft) rn := n.(*raft)
rn.RLock() rn.RLock()

View File

@@ -1372,7 +1372,7 @@ func TestJetStreamClusterPullConsumerAcksExtendInactivityThreshold(t *testing.T)
} }
// https://github.com/nats-io/nats-server/issues/3677 // https://github.com/nats-io/nats-server/issues/3677
func TestJetStreamParallelStreamCreation(t *testing.T) { func TestJetStreamClusterParallelStreamCreation(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3) c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown() defer c.shutdown()
@@ -1414,7 +1414,7 @@ func TestJetStreamParallelStreamCreation(t *testing.T) {
// In addition to test above, if streams were attempted to be created in parallel // In addition to test above, if streams were attempted to be created in parallel
// it could be that multiple raft groups would be created for the same asset. // it could be that multiple raft groups would be created for the same asset.
func TestJetStreamParallelStreamCreationDupeRaftGroups(t *testing.T) { func TestJetStreamClusterParallelStreamCreationDupeRaftGroups(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3) c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown() defer c.shutdown()
@@ -1463,19 +1463,19 @@ func TestJetStreamParallelStreamCreationDupeRaftGroups(t *testing.T) {
expected := 2 expected := 2
rg := make(map[string]struct{}) rg := make(map[string]struct{})
for _, s := range c.servers { for _, s := range c.servers {
s.mu.RLock() s.rnMu.RLock()
for _, ni := range s.raftNodes { for _, ni := range s.raftNodes {
n := ni.(*raft) n := ni.(*raft)
rg[n.Group()] = struct{}{} rg[n.Group()] = struct{}{}
} }
s.mu.RUnlock() s.rnMu.RUnlock()
} }
if len(rg) != expected { if len(rg) != expected {
t.Fatalf("Expected only %d distinct raft groups for all servers, go %d", expected, len(rg)) t.Fatalf("Expected only %d distinct raft groups for all servers, go %d", expected, len(rg))
} }
} }
func TestJetStreamParallelConsumerCreation(t *testing.T) { func TestJetStreamClusterParallelConsumerCreation(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3) c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown() defer c.shutdown()
@@ -1538,19 +1538,19 @@ func TestJetStreamParallelConsumerCreation(t *testing.T) {
expected := 3 expected := 3
rg := make(map[string]struct{}) rg := make(map[string]struct{})
for _, s := range c.servers { for _, s := range c.servers {
s.mu.RLock() s.rnMu.RLock()
for _, ni := range s.raftNodes { for _, ni := range s.raftNodes {
n := ni.(*raft) n := ni.(*raft)
rg[n.Group()] = struct{}{} rg[n.Group()] = struct{}{}
} }
s.mu.RUnlock() s.rnMu.RUnlock()
} }
if len(rg) != expected { if len(rg) != expected {
t.Fatalf("Expected only %d distinct raft groups for all servers, go %d", expected, len(rg)) t.Fatalf("Expected only %d distinct raft groups for all servers, go %d", expected, len(rg))
} }
} }
func TestJetStreamGhostEphemeralsAfterRestart(t *testing.T) { func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3) c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown() defer c.shutdown()
@@ -3470,19 +3470,20 @@ func TestJetStreamClusterNoR1AssetsDuringLameDuck(t *testing.T) {
}() }()
defer close(qch) defer close(qch)
s.mu.RLock()
gacc := s.gacc
s.mu.RUnlock()
if gacc == nil {
t.Fatalf("No global account")
}
// Make sure we do not have any R1 assets placed on the lameduck server. // Make sure we do not have any R1 assets placed on the lameduck server.
for s.isRunning() { for s.isRunning() {
s.mu.RLock() if len(gacc.streams()) > 0 {
if s.js == nil || s.js.srv == nil || s.js.srv.gacc == nil {
s.mu.RUnlock()
break
}
hasAsset := len(s.js.srv.gacc.streams()) > 0
s.mu.RUnlock()
if hasAsset {
t.Fatalf("Server had an R1 asset when it should not due to lameduck mode") t.Fatalf("Server had an R1 asset when it should not due to lameduck mode")
} }
time.Sleep(15 * time.Millisecond)
} }
s.WaitForShutdown()
} }
// If a consumer has not been registered (possible in heavily loaded systems with lots of assets) // If a consumer has not been registered (possible in heavily loaded systems with lots of assets)
@@ -3999,9 +4000,14 @@ func TestJetStreamClusterStreamScaleUpNoGroupCluster(t *testing.T) {
sa.Group.Cluster = _EMPTY_ sa.Group.Cluster = _EMPTY_
sa.Group.Preferred = _EMPTY_ sa.Group.Preferred = _EMPTY_
// Insert into meta layer. // Insert into meta layer.
s.mu.RLock() if sjs := s.getJetStream(); sjs != nil {
s.js.cluster.meta.ForwardProposal(encodeUpdateStreamAssignment(sa)) sjs.mu.RLock()
s.mu.RUnlock() meta := sjs.cluster.meta
sjs.mu.RUnlock()
if meta != nil {
meta.ForwardProposal(encodeUpdateStreamAssignment(sa))
}
}
// Make sure it got propagated.. // Make sure it got propagated..
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
sa := mset.streamAssignment().copyGroup() sa := mset.streamAssignment().copyGroup()
@@ -4714,7 +4720,7 @@ func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) {
require_True(t, si.State.Msgs == uint64(toSend)) require_True(t, si.State.Msgs == uint64(toSend))
} }
func TestJetStreamBinaryStreamSnapshotCapability(t *testing.T) { func TestJetStreamClusterBinaryStreamSnapshotCapability(t *testing.T) {
c := createJetStreamClusterExplicit(t, "NATS", 3) c := createJetStreamClusterExplicit(t, "NATS", 3)
defer c.shutdown() defer c.shutdown()
@@ -4783,7 +4789,7 @@ func TestJetStreamClusterBadEncryptKey(t *testing.T) {
} }
} }
func TestJetStreamAccountUsageDrifts(t *testing.T) { func TestJetStreamClusterAccountUsageDrifts(t *testing.T) {
tmpl := ` tmpl := `
listen: 127.0.0.1:-1 listen: 127.0.0.1:-1
server_name: %s server_name: %s

View File

@@ -281,6 +281,25 @@ var jsMixedModeGlobalAccountTempl = `
var jsGWTempl = `%s{name: %s, urls: [%s]}` var jsGWTempl = `%s{name: %s, urls: [%s]}`
var jsClusterAccountLimitsTempl = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
no_auth_user: js
accounts {
$JS { users = [ { user: "js", pass: "p" } ]; jetstream: {max_store: 1MB, max_mem: 0} }
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`
func createJetStreamTaggedSuperCluster(t *testing.T) *supercluster { func createJetStreamTaggedSuperCluster(t *testing.T) *supercluster {
return createJetStreamTaggedSuperClusterWithGWProxy(t, nil) return createJetStreamTaggedSuperClusterWithGWProxy(t, nil)
} }

View File

@@ -1272,6 +1272,7 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error)
// Setup the account resolver. For memory resolver, make sure the JWTs are // Setup the account resolver. For memory resolver, make sure the JWTs are
// properly formed but do not enforce expiration etc. // properly formed but do not enforce expiration etc.
// Lock is held on entry, but may be released/reacquired during this call.
func (s *Server) configureResolver() error { func (s *Server) configureResolver() error {
opts := s.getOpts() opts := s.getOpts()
s.accResolver = opts.AccountResolver s.accResolver = opts.AccountResolver
@@ -1286,7 +1287,12 @@ func (s *Server) configureResolver() error {
} }
} }
if len(opts.resolverPreloads) > 0 { if len(opts.resolverPreloads) > 0 {
if s.accResolver.IsReadOnly() { // Lock ordering is account resolver -> server, so we need to release
// the lock and reacquire it when done with account resolver's calls.
ar := s.accResolver
s.mu.Unlock()
defer s.mu.Lock()
if ar.IsReadOnly() {
return fmt.Errorf("resolver preloads only available for writeable resolver types MEM/DIR/CACHE_DIR") return fmt.Errorf("resolver preloads only available for writeable resolver types MEM/DIR/CACHE_DIR")
} }
for k, v := range opts.resolverPreloads { for k, v := range opts.resolverPreloads {
@@ -1294,7 +1300,7 @@ func (s *Server) configureResolver() error {
if err != nil { if err != nil {
return fmt.Errorf("preload account error for %q: %v", k, err) return fmt.Errorf("preload account error for %q: %v", k, err)
} }
s.accResolver.Store(k, v) ar.Store(k, v)
} }
} }
} }