mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[fixed] reload related races (#3222)
account.rm had races caused by reload copying rm from one account to another mset.store was used outsisde the lock in rare cases the stasz message was not received in time. Trigger automatically now sometimes a statsz message received before reload cause issues. try receiving a second time Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
@@ -3659,9 +3659,14 @@ func TestJetStreamClusterPeerExclusionTag(t *testing.T) {
|
||||
require_NoError(t, err)
|
||||
require_True(t, !v.Tags.Contains(jsExcludePlacement))
|
||||
|
||||
m, err := sub.NextMsg(time.Second)
|
||||
require_NoError(t, err)
|
||||
require_True(t, strings.Contains(string(m.Data), `"tags":["server:s-1","intersect"]`))
|
||||
// it is possible that sub already received a stasz message prior to reload, retry once
|
||||
cmp := false
|
||||
for i := 0; i < 2 && !cmp; i++ {
|
||||
m, err := sub.NextMsg(time.Second)
|
||||
require_NoError(t, err)
|
||||
cmp = strings.Contains(string(m.Data), `"tags":["server:s-1","intersect"]`)
|
||||
}
|
||||
require_True(t, cmp)
|
||||
|
||||
cfg.Replicas = 3
|
||||
_, err = js.UpdateStream(cfg)
|
||||
|
||||
@@ -60,6 +60,9 @@ type option interface {
|
||||
// IsJetStreamChange inidicates a change in the servers config for JetStream.
|
||||
// Account changes will be handled separately in reloadAuthorization.
|
||||
IsJetStreamChange() bool
|
||||
|
||||
// Indicates a change in the server that requires publishing the server's statz
|
||||
IsStatszChange() bool
|
||||
}
|
||||
|
||||
// noopOption is a base struct that provides default no-op behaviors.
|
||||
@@ -89,6 +92,10 @@ func (n noopOption) IsJetStreamChange() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (n noopOption) IsStatszChange() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// loggingOption is a base struct that provides default option behaviors for
|
||||
// logging-related options.
|
||||
type loggingOption struct {
|
||||
@@ -302,6 +309,10 @@ func (u *tagsOption) Apply(server *Server) {
|
||||
server.Noticef("Reloaded: tags")
|
||||
}
|
||||
|
||||
func (u *tagsOption) IsStatszChange() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// usersOption implements the option interface for the authorization `users`
|
||||
// setting.
|
||||
type usersOption struct {
|
||||
@@ -591,6 +602,10 @@ func (jso jetStreamOption) IsJetStreamChange() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (jso jetStreamOption) IsStatszChange() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
type ocspOption struct {
|
||||
noopOption
|
||||
newValue *OCSPConfig
|
||||
@@ -1393,6 +1408,7 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) {
|
||||
reloadJetstream = false
|
||||
jsEnabled = false
|
||||
reloadTLS = false
|
||||
isStatszChange = false
|
||||
)
|
||||
for _, opt := range opts {
|
||||
opt.Apply(s)
|
||||
@@ -1415,6 +1431,9 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) {
|
||||
reloadJetstream = true
|
||||
jsEnabled = opt.(*jetStreamOption).newValue
|
||||
}
|
||||
if opt.IsStatszChange() {
|
||||
isStatszChange = true
|
||||
}
|
||||
}
|
||||
|
||||
if reloadLogging {
|
||||
@@ -1440,6 +1459,8 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) {
|
||||
}
|
||||
// Make sure to reset the internal loop's version of JS.
|
||||
s.resetInternalLoopInfo()
|
||||
}
|
||||
if isStatszChange {
|
||||
s.sendStatszUpdate()
|
||||
}
|
||||
|
||||
@@ -1579,7 +1600,12 @@ func (s *Server) reloadAuthorization() {
|
||||
newAcc.lleafs = append([]*client(nil), acc.lleafs...)
|
||||
|
||||
newAcc.sl = acc.sl
|
||||
newAcc.rm = acc.rm
|
||||
if acc.rm != nil {
|
||||
newAcc.rm = make(map[string]int32)
|
||||
}
|
||||
for k, v := range acc.rm {
|
||||
newAcc.rm[k] = v
|
||||
}
|
||||
// Transfer internal client state. The configureAccounts call from above may have set up a new one.
|
||||
// We need to use the old one, and the isid to not confuse internal subs.
|
||||
newAcc.ic, newAcc.isid = acc.ic, acc.isid
|
||||
|
||||
@@ -4041,6 +4041,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
|
||||
mset.outq.unregister()
|
||||
}
|
||||
|
||||
store := mset.store
|
||||
// Clustered cleanup.
|
||||
mset.mu.Unlock()
|
||||
|
||||
@@ -4050,16 +4051,16 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
|
||||
sysc.closeConnection(ClientClosed)
|
||||
}
|
||||
|
||||
if mset.store == nil {
|
||||
if store == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if deleteFlag {
|
||||
if err := mset.store.Delete(); err != nil {
|
||||
if err := store.Delete(); err != nil {
|
||||
return err
|
||||
}
|
||||
js.releaseStreamResources(&mset.cfg)
|
||||
} else if err := mset.store.Stop(); err != nil {
|
||||
} else if err := store.Stop(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user