From aabaf6f10641ebe000e6c48a274dedc3fcf77efc Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Tue, 28 Jun 2022 18:36:13 +0200 Subject: [PATCH] [fixed] reload related races (#3222) account.rm had races caused by reload copying rm from one account to another mset.store was used outsisde the lock in rare cases the stasz message was not received in time. Trigger automatically now sometimes a statsz message received before reload cause issues. try receiving a second time Signed-off-by: Matthias Hanel --- server/jetstream_cluster_test.go | 11 ++++++++--- server/reload.go | 28 +++++++++++++++++++++++++++- server/stream.go | 7 ++++--- 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 88f3b992..e57f1693 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -3659,9 +3659,14 @@ func TestJetStreamClusterPeerExclusionTag(t *testing.T) { require_NoError(t, err) require_True(t, !v.Tags.Contains(jsExcludePlacement)) - m, err := sub.NextMsg(time.Second) - require_NoError(t, err) - require_True(t, strings.Contains(string(m.Data), `"tags":["server:s-1","intersect"]`)) + // it is possible that sub already received a stasz message prior to reload, retry once + cmp := false + for i := 0; i < 2 && !cmp; i++ { + m, err := sub.NextMsg(time.Second) + require_NoError(t, err) + cmp = strings.Contains(string(m.Data), `"tags":["server:s-1","intersect"]`) + } + require_True(t, cmp) cfg.Replicas = 3 _, err = js.UpdateStream(cfg) diff --git a/server/reload.go b/server/reload.go index 1f7d3a64..50d84939 100644 --- a/server/reload.go +++ b/server/reload.go @@ -60,6 +60,9 @@ type option interface { // IsJetStreamChange inidicates a change in the servers config for JetStream. // Account changes will be handled separately in reloadAuthorization. IsJetStreamChange() bool + + // Indicates a change in the server that requires publishing the server's statz + IsStatszChange() bool } // noopOption is a base struct that provides default no-op behaviors. @@ -89,6 +92,10 @@ func (n noopOption) IsJetStreamChange() bool { return false } +func (n noopOption) IsStatszChange() bool { + return false +} + // loggingOption is a base struct that provides default option behaviors for // logging-related options. type loggingOption struct { @@ -302,6 +309,10 @@ func (u *tagsOption) Apply(server *Server) { server.Noticef("Reloaded: tags") } +func (u *tagsOption) IsStatszChange() bool { + return true +} + // usersOption implements the option interface for the authorization `users` // setting. type usersOption struct { @@ -591,6 +602,10 @@ func (jso jetStreamOption) IsJetStreamChange() bool { return true } +func (jso jetStreamOption) IsStatszChange() bool { + return true +} + type ocspOption struct { noopOption newValue *OCSPConfig @@ -1393,6 +1408,7 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) { reloadJetstream = false jsEnabled = false reloadTLS = false + isStatszChange = false ) for _, opt := range opts { opt.Apply(s) @@ -1415,6 +1431,9 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) { reloadJetstream = true jsEnabled = opt.(*jetStreamOption).newValue } + if opt.IsStatszChange() { + isStatszChange = true + } } if reloadLogging { @@ -1440,6 +1459,8 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) { } // Make sure to reset the internal loop's version of JS. s.resetInternalLoopInfo() + } + if isStatszChange { s.sendStatszUpdate() } @@ -1579,7 +1600,12 @@ func (s *Server) reloadAuthorization() { newAcc.lleafs = append([]*client(nil), acc.lleafs...) newAcc.sl = acc.sl - newAcc.rm = acc.rm + if acc.rm != nil { + newAcc.rm = make(map[string]int32) + } + for k, v := range acc.rm { + newAcc.rm[k] = v + } // Transfer internal client state. The configureAccounts call from above may have set up a new one. // We need to use the old one, and the isid to not confuse internal subs. newAcc.ic, newAcc.isid = acc.ic, acc.isid diff --git a/server/stream.go b/server/stream.go index 40801312..f243ebf6 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4041,6 +4041,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { mset.outq.unregister() } + store := mset.store // Clustered cleanup. mset.mu.Unlock() @@ -4050,16 +4051,16 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { sysc.closeConnection(ClientClosed) } - if mset.store == nil { + if store == nil { return nil } if deleteFlag { - if err := mset.store.Delete(); err != nil { + if err := store.Delete(); err != nil { return err } js.releaseStreamResources(&mset.cfg) - } else if err := mset.store.Stop(); err != nil { + } else if err := store.Stop(); err != nil { return err }