diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 88f3b992..e57f1693 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -3659,9 +3659,14 @@ func TestJetStreamClusterPeerExclusionTag(t *testing.T) { require_NoError(t, err) require_True(t, !v.Tags.Contains(jsExcludePlacement)) - m, err := sub.NextMsg(time.Second) - require_NoError(t, err) - require_True(t, strings.Contains(string(m.Data), `"tags":["server:s-1","intersect"]`)) + // it is possible that sub already received a stasz message prior to reload, retry once + cmp := false + for i := 0; i < 2 && !cmp; i++ { + m, err := sub.NextMsg(time.Second) + require_NoError(t, err) + cmp = strings.Contains(string(m.Data), `"tags":["server:s-1","intersect"]`) + } + require_True(t, cmp) cfg.Replicas = 3 _, err = js.UpdateStream(cfg) diff --git a/server/reload.go b/server/reload.go index 1f7d3a64..50d84939 100644 --- a/server/reload.go +++ b/server/reload.go @@ -60,6 +60,9 @@ type option interface { // IsJetStreamChange inidicates a change in the servers config for JetStream. // Account changes will be handled separately in reloadAuthorization. IsJetStreamChange() bool + + // Indicates a change in the server that requires publishing the server's statz + IsStatszChange() bool } // noopOption is a base struct that provides default no-op behaviors. @@ -89,6 +92,10 @@ func (n noopOption) IsJetStreamChange() bool { return false } +func (n noopOption) IsStatszChange() bool { + return false +} + // loggingOption is a base struct that provides default option behaviors for // logging-related options. type loggingOption struct { @@ -302,6 +309,10 @@ func (u *tagsOption) Apply(server *Server) { server.Noticef("Reloaded: tags") } +func (u *tagsOption) IsStatszChange() bool { + return true +} + // usersOption implements the option interface for the authorization `users` // setting. type usersOption struct { @@ -591,6 +602,10 @@ func (jso jetStreamOption) IsJetStreamChange() bool { return true } +func (jso jetStreamOption) IsStatszChange() bool { + return true +} + type ocspOption struct { noopOption newValue *OCSPConfig @@ -1393,6 +1408,7 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) { reloadJetstream = false jsEnabled = false reloadTLS = false + isStatszChange = false ) for _, opt := range opts { opt.Apply(s) @@ -1415,6 +1431,9 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) { reloadJetstream = true jsEnabled = opt.(*jetStreamOption).newValue } + if opt.IsStatszChange() { + isStatszChange = true + } } if reloadLogging { @@ -1440,6 +1459,8 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) { } // Make sure to reset the internal loop's version of JS. s.resetInternalLoopInfo() + } + if isStatszChange { s.sendStatszUpdate() } @@ -1579,7 +1600,12 @@ func (s *Server) reloadAuthorization() { newAcc.lleafs = append([]*client(nil), acc.lleafs...) newAcc.sl = acc.sl - newAcc.rm = acc.rm + if acc.rm != nil { + newAcc.rm = make(map[string]int32) + } + for k, v := range acc.rm { + newAcc.rm[k] = v + } // Transfer internal client state. The configureAccounts call from above may have set up a new one. // We need to use the old one, and the isid to not confuse internal subs. newAcc.ic, newAcc.isid = acc.ic, acc.isid diff --git a/server/stream.go b/server/stream.go index 40801312..f243ebf6 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4041,6 +4041,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { mset.outq.unregister() } + store := mset.store // Clustered cleanup. mset.mu.Unlock() @@ -4050,16 +4051,16 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { sysc.closeConnection(ClientClosed) } - if mset.store == nil { + if store == nil { return nil } if deleteFlag { - if err := mset.store.Delete(); err != nil { + if err := store.Delete(); err != nil { return err } js.releaseStreamResources(&mset.cfg) - } else if err := mset.store.Stop(); err != nil { + } else if err := store.Stop(); err != nil { return err }