diff --git a/server/accounts.go b/server/accounts.go index 04535073..94074edc 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -3532,7 +3532,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim a.jsLimits = nil } - a.updated = time.Now().UTC() + a.updated = time.Now() clients := a.getClientsLocked() a.mu.Unlock() @@ -3991,7 +3991,7 @@ func removeCb(s *Server, pubKey string) { a.mpay = 0 a.mconns = 0 a.mleafs = 0 - a.updated = time.Now().UTC() + a.updated = time.Now() jsa := a.js a.mu.Unlock() // set the account to be expired and disconnect clients diff --git a/server/gateway.go b/server/gateway.go index bb3ae71b..900b903b 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -761,7 +761,7 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) { // Snapshot server options. opts := s.getOpts() - now := time.Now().UTC() + now := time.Now() c := &client{srv: s, nc: conn, start: now, last: now, kind: GATEWAY} // Are we creating the gateway based on the configuration diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 13322e3e..baed19e4 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5868,6 +5868,18 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su if isReplicaChange { // We are adding new peers here. if newCfg.Replicas > len(rg.Peers) { + // Check if we do not have a cluster assigned, and if we do not make sure we + // try to pick one. This could happen with older streams that were assigned by + // previous servers. + if rg.Cluster == _EMPTY_ { + // Prefer placement directrives if we have them. + if newCfg.Placement != nil && newCfg.Placement.Cluster != _EMPTY_ { + rg.Cluster = newCfg.Placement.Cluster + } else { + // Fall back to the cluster assignment from the client. + rg.Cluster = ci.Cluster + } + } peers, err := cc.selectPeerGroup(newCfg.Replicas, rg.Cluster, newCfg, rg.Peers, 0, nil) if err != nil { resp.Error = NewJSClusterNoPeersError(err) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 899f1ff8..f2f19bbc 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3997,3 +3997,52 @@ func TestJetStreamClusterStreamAccountingDriftFixups(t *testing.T) { require_NoError(t, err) require_True(t, jsz.JetStreamStats.Store == 0) } + +// Some older streams seem to have been created or exist with no explicit cluster setting. +// For server <= 2.9.16 you could not scale the streams up since we could not place them in another cluster. +func TestJetStreamClusterStreamScaleUpNoGroupCluster(t *testing.T) { + c := createJetStreamClusterExplicit(t, "NATS", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + }) + require_NoError(t, err) + + // Manually going to grab stream assignment and update it to be without the group cluster. + s := c.streamLeader(globalAccountName, "TEST") + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + + sa := mset.streamAssignment() + require_NotNil(t, sa) + // Make copy to not change stream's + sa = sa.copyGroup() + // Remove cluster and preferred. + sa.Group.Cluster = _EMPTY_ + sa.Group.Preferred = _EMPTY_ + // Insert into meta layer. + s.mu.RLock() + s.js.cluster.meta.ForwardProposal(encodeUpdateStreamAssignment(sa)) + s.mu.RUnlock() + // Make sure it got propagated.. + checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { + sa := mset.streamAssignment().copyGroup() + require_NotNil(t, sa) + if sa.Group.Cluster != _EMPTY_ { + return fmt.Errorf("Cluster still not cleared") + } + return nil + }) + // Now we know it has been nil'd out. Make sure we can scale up. + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + Replicas: 3, + }) + require_NoError(t, err) +} diff --git a/server/monitor.go b/server/monitor.go index 09b7d15d..e67b4052 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -787,7 +787,7 @@ type RouteInfo struct { // Routez returns a Routez struct containing information about routes. func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) { rs := &Routez{Routes: []*RouteInfo{}} - rs.Now = time.Now().UTC() + rs.Now = time.Now() if routezOpts == nil { routezOpts = &RoutezOptions{} @@ -980,7 +980,7 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) { slStats := &SublistStats{} // FIXME(dlc) - Make account aware. - sz := &Subsz{s.info.ID, time.Now().UTC(), slStats, 0, offset, limit, nil} + sz := &Subsz{s.info.ID, time.Now(), slStats, 0, offset, limit, nil} if subdetail { var raw [4096]*subscription @@ -1607,7 +1607,7 @@ func (s *Server) updateVarzConfigReloadableFields(v *Varz) { v.MaxPending = opts.MaxPending v.TLSTimeout = opts.TLSTimeout v.WriteDeadline = opts.WriteDeadline - v.ConfigLoadTime = s.configTime + v.ConfigLoadTime = s.configTime.UTC() // Update route URLs if applicable if s.varzUpdateRouteURLs { v.Cluster.URLs = urlsToStrings(opts.Routes) diff --git a/server/mqtt.go b/server/mqtt.go index ed1893e3..bada616b 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -422,7 +422,7 @@ func (s *Server) createMQTTClient(conn net.Conn, ws *websocket) *client { if maxSubs == 0 { maxSubs = -1 } - now := time.Now().UTC() + now := time.Now() c := &client{srv: s, nc: conn, mpay: maxPay, msubs: maxSubs, start: now, last: now, mqtt: &mqtt{}, ws: ws} c.headers = true diff --git a/server/server.go b/server/server.go index 5c3d2c45..a341d053 100644 --- a/server/server.go +++ b/server/server.go @@ -606,7 +606,7 @@ func NewServer(opts *Options) (*Server, error) { info.TLSAvailable = true } - now := time.Now().UTC() + now := time.Now() s := &Server{ kp: kp, @@ -1154,7 +1154,7 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error) s.mu.Lock() acc.mu.Lock() } - acc.updated = time.Now().UTC() + acc.updated = time.Now() acc.mu.Unlock() return true }) @@ -1647,7 +1647,7 @@ func (s *Server) createInternalClient(kind int) *client { if kind != SYSTEM && kind != JETSTREAM && kind != ACCOUNT { return nil } - now := time.Now().UTC() + now := time.Now() c := &client{srv: s, kind: kind, opts: internalOpts, msubs: -1, mpay: -1, start: now, last: now} c.initClient() c.echo = false @@ -1718,7 +1718,7 @@ func (s *Server) registerAccountNoLock(acc *Account) *Account { acc.lqws = make(map[string]int32) } acc.srv = s - acc.updated = time.Now().UTC() + acc.updated = time.Now() accName := acc.Name jsEnabled := len(acc.jsLimits) > 0 acc.mu.Unlock() @@ -2890,7 +2890,7 @@ func (s *Server) createClient(conn net.Conn) *client { if maxSubs == 0 { maxSubs = -1 } - now := time.Now().UTC() + now := time.Now() c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now} @@ -3057,7 +3057,7 @@ func (s *Server) createClient(conn net.Conn) *client { // This will save off a closed client in a ring buffer such that // /connz can inspect. Useful for debugging, etc. func (s *Server) saveClosedClient(c *client, nc net.Conn, reason ClosedState) { - now := time.Now().UTC() + now := time.Now() s.accountDisconnectEvent(c, now, reason.String())