From 2f2498ab7edb839de82b8039eb7f587511478595 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 10 May 2023 15:32:45 -0700 Subject: [PATCH 1/3] Bump to 2.9.17-beta.7 Signed-off-by: Derek Collison --- server/const.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/const.go b/server/const.go index 336e9bcd..99eaf1b2 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.9.17-beta.6" + VERSION = "2.9.17-beta.7" // PROTO is the currently supported protocol. // 0 was the original From 5e029d08d51056a093a20ec6caefcc38a01db1c5 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 10 May 2023 17:59:28 -0700 Subject: [PATCH 2/3] For older R1 streams created by previous servers we could have no cluster for the stream assignment group which would prevent scale up with newer servers. This will inherit cluster if detected from placement tags or client cluster designation. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 12 ++++++++ server/jetstream_cluster_3_test.go | 49 ++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 13322e3e..baed19e4 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5868,6 +5868,18 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su if isReplicaChange { // We are adding new peers here. if newCfg.Replicas > len(rg.Peers) { + // Check if we do not have a cluster assigned, and if we do not make sure we + // try to pick one. This could happen with older streams that were assigned by + // previous servers. + if rg.Cluster == _EMPTY_ { + // Prefer placement directrives if we have them. + if newCfg.Placement != nil && newCfg.Placement.Cluster != _EMPTY_ { + rg.Cluster = newCfg.Placement.Cluster + } else { + // Fall back to the cluster assignment from the client. + rg.Cluster = ci.Cluster + } + } peers, err := cc.selectPeerGroup(newCfg.Replicas, rg.Cluster, newCfg, rg.Peers, 0, nil) if err != nil { resp.Error = NewJSClusterNoPeersError(err) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 7c00c3f0..82faceaa 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3997,3 +3997,52 @@ func TestJetStreamClusterStreamAccountingDriftFixups(t *testing.T) { require_NoError(t, err) require_True(t, jsz.JetStreamStats.Store == 0) } + +// Some older streams seem to have been created or exist with no explicit cluster setting. +// For server <= 2.9.16 you could not scale the streams up since we could not place them in another cluster. +func TestJetStreamClusterStreamScaleUpNoGroupCluster(t *testing.T) { + c := createJetStreamClusterExplicit(t, "NATS", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + }) + require_NoError(t, err) + + // Manually going to grab stream assignment and update it to be without the group cluster. + s := c.streamLeader(globalAccountName, "TEST") + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + + sa := mset.streamAssignment() + require_NotNil(t, sa) + // Make copy to not change stream's + sa = sa.copyGroup() + // Remove cluster and preferred. + sa.Group.Cluster = _EMPTY_ + sa.Group.Preferred = _EMPTY_ + // Insert into meta layer. + s.mu.RLock() + s.js.cluster.meta.ForwardProposal(encodeUpdateStreamAssignment(sa)) + s.mu.RUnlock() + // Make sure it got propagated.. + checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { + sa := mset.streamAssignment().copyGroup() + require_NotNil(t, sa) + if sa.Group.Cluster != _EMPTY_ { + return fmt.Errorf("Cluster still not cleared") + } + return nil + }) + // Now we know it has been nil'd out. Make sure we can scale up. + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + Replicas: 3, + }) + require_NoError(t, err) +} From 286a1632caeeba1f4fa68469be4a4c26ca9e13ac Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 5 May 2023 11:24:18 -0700 Subject: [PATCH 3/3] Use monotonic time for measuring time internally Signed-off-by: Waldemar Quevedo --- server/accounts.go | 4 ++-- server/gateway.go | 2 +- server/monitor.go | 6 +++--- server/mqtt.go | 2 +- server/server.go | 12 ++++++------ 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 5539a6d8..8f62104b 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -3502,7 +3502,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim a.jsLimits = nil } - a.updated = time.Now().UTC() + a.updated = time.Now() clients := a.getClientsLocked() a.mu.Unlock() @@ -3961,7 +3961,7 @@ func removeCb(s *Server, pubKey string) { a.mpay = 0 a.mconns = 0 a.mleafs = 0 - a.updated = time.Now().UTC() + a.updated = time.Now() jsa := a.js a.mu.Unlock() // set the account to be expired and disconnect clients diff --git a/server/gateway.go b/server/gateway.go index 7907b014..afc75683 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -761,7 +761,7 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) { // Snapshot server options. opts := s.getOpts() - now := time.Now().UTC() + now := time.Now() c := &client{srv: s, nc: conn, start: now, last: now, kind: GATEWAY} // Are we creating the gateway based on the configuration diff --git a/server/monitor.go b/server/monitor.go index 66045cd0..781bfeda 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -783,7 +783,7 @@ type RouteInfo struct { // Routez returns a Routez struct containing information about routes. func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) { rs := &Routez{Routes: []*RouteInfo{}} - rs.Now = time.Now().UTC() + rs.Now = time.Now() if routezOpts == nil { routezOpts = &RoutezOptions{} @@ -970,7 +970,7 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) { slStats := &SublistStats{} // FIXME(dlc) - Make account aware. - sz := &Subsz{s.info.ID, time.Now().UTC(), slStats, 0, offset, limit, nil} + sz := &Subsz{s.info.ID, time.Now(), slStats, 0, offset, limit, nil} if subdetail { var raw [4096]*subscription @@ -1595,7 +1595,7 @@ func (s *Server) updateVarzConfigReloadableFields(v *Varz) { v.MaxPending = opts.MaxPending v.TLSTimeout = opts.TLSTimeout v.WriteDeadline = opts.WriteDeadline - v.ConfigLoadTime = s.configTime + v.ConfigLoadTime = s.configTime.UTC() // Update route URLs if applicable if s.varzUpdateRouteURLs { v.Cluster.URLs = urlsToStrings(opts.Routes) diff --git a/server/mqtt.go b/server/mqtt.go index 9353563a..2d214e01 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -422,7 +422,7 @@ func (s *Server) createMQTTClient(conn net.Conn, ws *websocket) *client { if maxSubs == 0 { maxSubs = -1 } - now := time.Now().UTC() + now := time.Now() c := &client{srv: s, nc: conn, mpay: maxPay, msubs: maxSubs, start: now, last: now, mqtt: &mqtt{}, ws: ws} c.headers = true diff --git a/server/server.go b/server/server.go index 545b4b1c..33230f1e 100644 --- a/server/server.go +++ b/server/server.go @@ -373,7 +373,7 @@ func NewServer(opts *Options) (*Server, error) { info.TLSAvailable = true } - now := time.Now().UTC() + now := time.Now() s := &Server{ kp: kp, @@ -884,7 +884,7 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error) s.mu.Lock() acc.mu.Lock() } - acc.updated = time.Now().UTC() + acc.updated = time.Now() acc.mu.Unlock() return true }) @@ -1383,7 +1383,7 @@ func (s *Server) createInternalClient(kind int) *client { if kind != SYSTEM && kind != JETSTREAM && kind != ACCOUNT { return nil } - now := time.Now().UTC() + now := time.Now() c := &client{srv: s, kind: kind, opts: internalOpts, msubs: -1, mpay: -1, start: now, last: now} c.initClient() c.echo = false @@ -1453,7 +1453,7 @@ func (s *Server) registerAccountNoLock(acc *Account) *Account { acc.lqws = make(map[string]int32) } acc.srv = s - acc.updated = time.Now().UTC() + acc.updated = time.Now() accName := acc.Name jsEnabled := len(acc.jsLimits) > 0 acc.mu.Unlock() @@ -2581,7 +2581,7 @@ func (s *Server) createClient(conn net.Conn) *client { if maxSubs == 0 { maxSubs = -1 } - now := time.Now().UTC() + now := time.Now() c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now} @@ -2748,7 +2748,7 @@ func (s *Server) createClient(conn net.Conn) *client { // This will save off a closed client in a ring buffer such that // /connz can inspect. Useful for debugging, etc. func (s *Server) saveClosedClient(c *client, nc net.Conn, reason ClosedState) { - now := time.Now().UTC() + now := time.Now() s.accountDisconnectEvent(c, now, reason.String())