diff --git a/server/client.go b/server/client.go index 9670fd58..b1edd1e4 100644 --- a/server/client.go +++ b/server/client.go @@ -3948,11 +3948,19 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt if err := json.Unmarshal(getHeader(ClientInfoHdr, msg[:c.pa.hdr]), &cis); err == nil { ci = &cis ci.Service = acc.Name + // Check if we are moving into a share details account from a non-shared + // and add in server and cluster details. + if !share && si.share { + c.addServerAndClusterInfo(ci) + } } } else if c.kind != LEAF || c.pa.hdr < 0 || len(getHeader(ClientInfoHdr, msg[:c.pa.hdr])) == 0 { ci = c.getClientInfo(share) + } else if c.kind == LEAF && si.share { + // We have a leaf header here for ci, augment as above. + ci = c.getClientInfo(si.share) } - + // Set clientInfo if present. if ci != nil { if b, _ := json.Marshal(ci); b != nil { msg = c.setHeader(ClientInfoHdr, string(b), msg) @@ -5034,31 +5042,52 @@ func (ci *ClientInfo) serviceAccount() string { return ci.Account } +// Add in our server and cluster information to this client info. +func (c *client) addServerAndClusterInfo(ci *ClientInfo) { + if ci == nil { + return + } + // Server + if c.kind != LEAF { + ci.Server = c.srv.Name() + } else if c.kind == LEAF { + ci.Server = c.leaf.remoteServer + } + // Cluster + ci.Cluster = c.srv.cachedClusterName() + // If we have gateways fill in cluster alternates. + // These will be in RTT asc order. + if c.srv.gateway.enabled { + var gws []*client + c.srv.getOutboundGatewayConnections(&gws) + for _, c := range gws { + c.mu.Lock() + cn := c.gw.name + c.mu.Unlock() + ci.Alternates = append(ci.Alternates, cn) + } + } +} + // Grabs the information for this client. func (c *client) getClientInfo(detailed bool) *ClientInfo { - if c == nil || (c.kind != CLIENT && c.kind != LEAF && c.kind != JETSTREAM) { + if c == nil || (c.kind != CLIENT && c.kind != LEAF && c.kind != JETSTREAM && c.kind != ACCOUNT) { return nil } - // Server name. Defaults to server ID if not set explicitly. - var cn, sn string + // Result + var ci ClientInfo + if detailed { - if c.kind != LEAF { - sn = c.srv.Name() - } - cn = c.srv.cachedClusterName() + c.addServerAndClusterInfo(&ci) } c.mu.Lock() - var ci ClientInfo // RTT and Account are always added. ci.Account = accForClient(c) ci.RTT = c.rtt // Detailed signals additional opt in. if detailed { - if c.kind == LEAF { - sn = c.leaf.remoteServer - } ci.Start = &c.start ci.Host = c.host ci.ID = c.cid @@ -5066,8 +5095,6 @@ func (c *client) getClientInfo(detailed bool) *ClientInfo { ci.User = c.getRawAuthUser() ci.Lang = c.opts.Lang ci.Version = c.opts.Version - ci.Server = sn - ci.Cluster = cn ci.Jwt = c.opts.JWT ci.IssuerKey = issuerForClient(c) ci.NameTag = c.nameTag diff --git a/server/consumer.go b/server/consumer.go index 29781569..90dba309 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1,4 +1,4 @@ -// Copyright 2019-2021 The NATS Authors +// Copyright 2019-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -2104,8 +2104,8 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _, sendBatch(&wr) } else { // Check for API outstanding requests. - if apiOut := atomic.AddInt64(&js.apiCalls, 1); apiOut > maxJSApiOut { - atomic.AddInt64(&js.apiCalls, -1) + if apiOut := atomic.AddInt64(&js.apiInflight, 1); apiOut > maxJSApiOut { + atomic.AddInt64(&js.apiInflight, -1) sendErr(503, "JetStream API limit exceeded") s.Warnf("JetStream API limit exceeded: %d calls outstanding", apiOut) return @@ -2115,7 +2115,7 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _, o.mu.Lock() sendBatch(&wr) o.mu.Unlock() - atomic.AddInt64(&js.apiCalls, -1) + atomic.AddInt64(&js.apiInflight, -1) }() } } diff --git a/server/errors.json b/server/errors.json index 5a2e857c..a9e74658 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1108,5 +1108,15 @@ "help": "Returned when the delivery subject on a Push Consumer is not a valid NATS Subject", "url": "", "deprecates": "" + }, + { + "constant": "JSStreamMaxBytesRequired", + "code": 400, + "error_code": 10113, + "description": "account requires a stream config to have max bytes set", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } -] +] \ No newline at end of file diff --git a/server/events.go b/server/events.go index d1b14e4e..8454a0b4 100644 --- a/server/events.go +++ b/server/events.go @@ -1,4 +1,4 @@ -// Copyright 2018-2021 The NATS Authors +// Copyright 2018-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -181,6 +181,7 @@ type ClientInfo struct { RTT time.Duration `json:"rtt,omitempty"` Server string `json:"server,omitempty"` Cluster string `json:"cluster,omitempty"` + Alternates []string `json:"alts,omitempty"` Stop *time.Time `json:"stop,omitempty"` Jwt string `json:"jwt,omitempty"` IssuerKey string `json:"issuer_key,omitempty"` @@ -669,6 +670,14 @@ func (s *Server) sendStatsz(subj string) { jStat.Config = &c js.mu.RUnlock() jStat.Stats = js.usageStats() + // Update our own usage since we do not echo so we will not hear ourselves. + ourNode := string(getHash(s.serverName())) + if v, ok := s.nodeToInfo.Load(ourNode); ok && v != nil { + ni := v.(nodeInfo) + ni.stats = jStat.Stats + s.nodeToInfo.Store(ourNode, ni) + } + // Metagroup info. if mg := js.getMetaGroup(); mg != nil { if mg.Leader() { if ci := s.raftNodeToClusterInfo(mg); ci != nil { @@ -695,9 +704,11 @@ func (s *Server) sendStatsz(subj string) { func (s *Server) heartbeatStatsz() { if s.sys.stmr != nil { // Increase after startup to our max. - s.sys.cstatsz *= 4 - if s.sys.cstatsz > s.sys.statsz { - s.sys.cstatsz = s.sys.statsz + if s.sys.cstatsz < s.sys.statsz { + s.sys.cstatsz *= 2 + if s.sys.cstatsz > s.sys.statsz { + s.sys.cstatsz = s.sys.statsz + } } s.sys.stmr.Reset(s.sys.cstatsz) } @@ -714,7 +725,7 @@ func (s *Server) sendStatszUpdate() { func (s *Server) startStatszTimer() { // We will start by sending out more of these and trail off to the statsz being the max. s.sys.cstatsz = 250 * time.Millisecond - // Send out the first one after 250ms. + // Send out the first one quickly, we will slowly back off. s.sys.stmr = time.AfterFunc(s.sys.cstatsz, s.wrapChk(s.heartbeatStatsz)) } @@ -1031,10 +1042,10 @@ func (s *Server) processRemoteServerShutdown(sid string) { }) // Update any state in nodeInfo. s.nodeToInfo.Range(func(k, v interface{}) bool { - si := v.(nodeInfo) - if si.id == sid { - si.offline = true - s.nodeToInfo.Store(k, si) + ni := v.(nodeInfo) + if ni.id == sid { + ni.offline = true + s.nodeToInfo.Store(k, ni) return false } return true @@ -1071,12 +1082,14 @@ func (s *Server) remoteServerShutdown(sub *subscription, c *client, _ *Account, s.Debugf("Received bad server info for remote server shutdown") return } - // Additional processing here. - if !s.sameDomain(si.Domain) { - return - } + + // JetStream node updates if applicable. node := string(getHash(si.Name)) - s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.Domain, si.ID, true, true}) + if v, ok := s.nodeToInfo.Load(node); ok && v != nil { + ni := v.(nodeInfo) + ni.offline = true + s.nodeToInfo.Store(node, ni) + } sid := toks[serverSubjectIndex] if su := s.sys.servers[sid]; su != nil { @@ -1095,44 +1108,66 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su return } si := ssm.Server + + // JetStream node updates. if !s.sameDomain(si.Domain) { return } + var cfg *JetStreamConfig + var stats *JetStreamStats + + if ssm.Stats.JetStream != nil { + cfg = ssm.Stats.JetStream.Config + stats = ssm.Stats.JetStream.Stats + } + node := string(getHash(si.Name)) - s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.Domain, si.ID, false, si.JetStream}) + s.nodeToInfo.Store(node, nodeInfo{ + si.Name, + si.Cluster, + si.Domain, + si.ID, + cfg, + stats, + false, si.JetStream, + }) } // updateRemoteServer is called when we have an update from a remote server. // This allows us to track remote servers, respond to shutdown messages properly, // make sure that messages are ordered, and allow us to prune dead servers. // Lock should be held upon entry. -func (s *Server) updateRemoteServer(ms *ServerInfo) { - su := s.sys.servers[ms.ID] +func (s *Server) updateRemoteServer(si *ServerInfo) { + su := s.sys.servers[si.ID] if su == nil { - s.sys.servers[ms.ID] = &serverUpdate{ms.Seq, time.Now()} - s.processNewServer(ms) + s.sys.servers[si.ID] = &serverUpdate{si.Seq, time.Now()} + s.processNewServer(si) } else { // Should always be going up. - if ms.Seq <= su.seq { - s.Errorf("Received out of order remote server update from: %q", ms.ID) + if si.Seq <= su.seq { + s.Errorf("Received out of order remote server update from: %q", si.ID) return } - su.seq = ms.Seq + su.seq = si.Seq su.ltime = time.Now() } } // processNewServer will hold any logic we want to use when we discover a new server. // Lock should be held upon entry. -func (s *Server) processNewServer(ms *ServerInfo) { +func (s *Server) processNewServer(si *ServerInfo) { // Right now we only check if we have leafnode servers and if so send another // connect update to make sure they switch this account to interest only mode. s.ensureGWsInterestOnlyForLeafNodes() + // Add to our nodeToName - if s.sameDomain(ms.Domain) { - node := string(getHash(ms.Name)) - s.nodeToInfo.Store(node, nodeInfo{ms.Name, ms.Cluster, ms.Domain, ms.ID, false, ms.JetStream}) + if s.sameDomain(si.Domain) { + node := string(getHash(si.Name)) + // Only update if non-existent + if _, ok := s.nodeToInfo.Load(node); !ok { + s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.Domain, si.ID, nil, nil, false, si.JetStream}) + } } // Announce ourselves.. s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID)) @@ -1378,9 +1413,15 @@ type ServerAPIConnzResponse struct { // statszReq is a request for us to respond with current statsz. func (s *Server) statszReq(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { - if !s.EventsEnabled() || reply == _EMPTY_ { + if !s.EventsEnabled() { return } + + // No reply is a signal that we should use our normal broadcast subject. + if reply == _EMPTY_ { + reply = fmt.Sprintf(serverStatsSubj, s.info.ID) + } + opts := StatszEventOptions{} if _, msg := c.msgParts(rmsg); len(msg) != 0 { if err := json.Unmarshal(msg, &opts); err != nil { diff --git a/server/jetstream.go b/server/jetstream.go index fdb66112..6b62a5f1 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1,4 +1,4 @@ -// Copyright 2019-2021 The NATS Authors +// Copyright 2019-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -48,22 +48,22 @@ type JetStreamConfig struct { Domain string `json:"domain,omitempty"` } +// Statistics about JetStream for this server. type JetStreamStats struct { - Memory uint64 `json:"memory"` - Store uint64 `json:"storage"` - ReservedMemoryUsed uint64 `json:"reserved_memory_used,omitempty"` - ReserveStoreUsed uint64 `json:"reserved_storage_used,omitempty"` - Accounts int `json:"accounts,omitempty"` - API JetStreamAPIStats `json:"api"` - ReservedMemory uint64 `json:"reserved_memory,omitempty"` - ReservedStore uint64 `json:"reserved_storage,omitempty"` + Memory uint64 `json:"memory"` + Store uint64 `json:"storage"` + ReservedMemory uint64 `json:"reserved_memory"` + ReservedStore uint64 `json:"reserved_storage"` + Accounts int `json:"accounts"` + API JetStreamAPIStats `json:"api"` } type JetStreamAccountLimits struct { - MaxMemory int64 `json:"max_memory"` - MaxStore int64 `json:"max_storage"` - MaxStreams int `json:"max_streams"` - MaxConsumers int `json:"max_consumers"` + MaxMemory int64 `json:"max_memory"` + MaxStore int64 `json:"max_storage"` + MaxStreams int `json:"max_streams"` + MaxConsumers int `json:"max_consumers"` + MaxBytesRequired bool `json:"max_bytes_required"` } // JetStreamAccountStats returns current statistics about the account's JetStream usage. @@ -78,27 +78,28 @@ type JetStreamAccountStats struct { } type JetStreamAPIStats struct { - Total uint64 `json:"total"` - Errors uint64 `json:"errors"` + Total uint64 `json:"total"` + Errors uint64 `json:"errors"` + Inflight uint64 `json:"inflight,omitempty"` } // This is for internal accounting for JetStream for this server. type jetStream struct { // These are here first because of atomics on 32bit systems. + apiInflight int64 + apiTotal int64 + apiErrors int64 memReserved int64 storeReserved int64 - apiCalls int64 - apiErrors int64 - memTotal int64 - storeTotal int64 - memTotalRes int64 - storeTotalRes int64 + memUsed int64 + storeUsed int64 mu sync.RWMutex srv *Server config JetStreamConfig cluster *jetStreamCluster accounts map[string]*jsAccount apiSubs *Sublist + standAlone bool disabled bool oos bool } @@ -306,8 +307,10 @@ func (s *Server) checkStoreDir(cfg *JetStreamConfig) error { // enableJetStream will start up the JetStream subsystem. func (s *Server) enableJetStream(cfg JetStreamConfig) error { + js := &jetStream{srv: s, config: cfg, accounts: make(map[string]*jsAccount), apiSubs: NewSublistNoCache()} + s.mu.Lock() - s.js = &jetStream{srv: s, config: cfg, accounts: make(map[string]*jsAccount), apiSubs: NewSublistNoCache()} + s.js = js s.mu.Unlock() // FIXME(dlc) - Allow memory only operation? @@ -360,19 +363,21 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error { s.Debugf(" %s", jsAllAPI) s.setupJetStreamExports() - // Enable accounts and restore state before starting clustering. - if err := s.enableJetStreamAccounts(); err != nil { - return err - } - - canExtend := s.canExtendOtherDomain() - standAlone := s.standAloneMode() + standAlone, canExtend := s.standAloneMode(), s.canExtendOtherDomain() if standAlone && canExtend && s.getOpts().JetStreamExtHint != jsWillExtend { canExtend = false s.Noticef("Standalone server started in clustered mode do not support extending domains") s.Noticef(`Manually disable standalone mode by setting the JetStream Option "extension_hint: %s"`, jsWillExtend) } + // Indicate if we will be standalone for checking resource reservations, etc. + js.setJetStreamStandAlone(standAlone && !canExtend) + + // Enable accounts and restore state before starting clustering. + if err := s.enableJetStreamAccounts(); err != nil { + return err + } + // If we are in clustered mode go ahead and start the meta controller. if !standAlone || canExtend { if err := s.enableJetStreamClustering(); err != nil { @@ -706,6 +711,16 @@ func (js *jetStream) isEnabled() bool { return !js.disabled } +// Mark that we will be in standlone mode. +func (js *jetStream) setJetStreamStandAlone(isStandAlone bool) { + if js == nil { + return + } + js.mu.Lock() + defer js.mu.Unlock() + js.standAlone = isStandAlone +} + // JetStreamEnabled reports if jetstream is enabled for this server. func (s *Server) JetStreamEnabled() bool { var js *jetStream @@ -927,30 +942,30 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { limits = dynamicJSAccountLimits } + a.assignJetStreamLimits(limits) + js := s.getJetStream() if js == nil { - a.assignJetStreamLimits(limits) return NewJSNotEnabledError() } - a.assignJetStreamLimits(limits) - js.mu.Lock() - // Check the limits against existing reservations. if _, ok := js.accounts[a.Name]; ok && a.JetStreamEnabled() { js.mu.Unlock() return fmt.Errorf("jetstream already enabled for account") } + + // Check the limits against existing reservations. if err := js.sufficientResources(limits); err != nil { js.mu.Unlock() return err } + jsa := &jsAccount{js: js, account: a, limits: *limits, streams: make(map[string]*stream), sendq: sendq} jsa.utimer = time.AfterFunc(usageTick, jsa.sendClusterUsageUpdateTimer) jsa.storeDir = path.Join(js.config.StoreDir, a.Name) js.accounts[a.Name] = jsa - js.reserveResources(limits) js.mu.Unlock() sysNode := s.Node() @@ -1223,6 +1238,18 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { return nil } +// Return whether or not we require MaxBytes to be set. +func (a *Account) maxBytesRequired() bool { + a.mu.RLock() + defer a.mu.RUnlock() + + jsa := a.js + if jsa == nil { + return false + } + return jsa.limits.MaxBytesRequired +} + // NumStreams will return how many streams we have. func (a *Account) numStreams() int { a.mu.RLock() @@ -1293,8 +1320,7 @@ func (a *Account) lookupStream(name string) (*stream, error) { // UpdateJetStreamLimits will update the account limits for a JetStream enabled account. func (a *Account) UpdateJetStreamLimits(limits *JetStreamAccountLimits) error { a.mu.RLock() - s := a.srv - jsa := a.js + s, jsa := a.srv, a.js a.mu.RUnlock() if s == nil { @@ -1315,7 +1341,6 @@ func (a *Account) UpdateJetStreamLimits(limits *JetStreamAccountLimits) error { // Calculate the delta between what we have and what we want. jsa.mu.Lock() dl := diffCheckedLimits(&jsa.limits, limits) - jsaLimits := jsa.limits jsa.mu.Unlock() js.mu.Lock() @@ -1324,23 +1349,6 @@ func (a *Account) UpdateJetStreamLimits(limits *JetStreamAccountLimits) error { js.mu.Unlock() return err } - // FIXME(dlc) - If we drop and are over the max on memory or store, do we delete?? - js.releaseResources(&jsaLimits) - js.reserveResources(limits) - if jsaLimits.MaxMemory >= 0 && limits.MaxMemory < 0 { - // we had a reserve and are now dropping it - atomic.AddInt64(&js.memTotalRes, -jsa.memTotal) - } else if jsaLimits.MaxMemory < 0 && limits.MaxMemory >= 0 { - // we had no reserve and are now adding it - atomic.AddInt64(&js.memTotalRes, jsa.memTotal) - } - if jsaLimits.MaxStore >= 0 && limits.MaxStore < 0 { - // we had a reserve and are now dropping it - atomic.AddInt64(&js.storeTotalRes, -jsa.storeTotal) - } else if jsaLimits.MaxStore < 0 && limits.MaxStore >= 0 { - // we had no reserve and are now adding it - atomic.AddInt64(&js.storeTotalRes, jsa.storeTotal) - } js.mu.Unlock() // Update @@ -1427,7 +1435,6 @@ func (js *jetStream) disableJetStream(jsa *jsAccount) error { js.mu.Lock() delete(js.accounts, jsa.account.Name) - js.releaseResources(&jsa.limits) js.mu.Unlock() jsa.delete() @@ -1518,17 +1525,11 @@ func (jsa *jsAccount) updateUsage(storeType StorageType, delta int64) { if storeType == MemoryStorage { jsa.usage.mem += delta jsa.memTotal += delta - atomic.AddInt64(&js.memTotal, delta) - if jsa.limits.MaxMemory > 0 { - atomic.AddInt64(&js.memTotalRes, delta) - } + atomic.AddInt64(&js.memUsed, delta) } else { jsa.usage.store += delta jsa.storeTotal += delta - atomic.AddInt64(&js.storeTotal, delta) - if jsa.limits.MaxStore > 0 { - atomic.AddInt64(&js.storeTotalRes, delta) - } + atomic.AddInt64(&js.storeUsed, delta) } // Publish our local updates if in clustered mode. if isClustered { @@ -1550,7 +1551,7 @@ func (jsa *jsAccount) sendClusterUsageUpdateTimer() { // Send updates to our account usage for this server. // Lock should be held. func (jsa *jsAccount) sendClusterUsageUpdate() { - if jsa.js == nil || jsa.js.srv == nil { + if jsa.js == nil || jsa.js.srv == nil || jsa.sendq == nil { return } // These values are absolute so we can limit send rates. @@ -1566,9 +1567,8 @@ func (jsa *jsAccount) sendClusterUsageUpdate() { le.PutUint64(b[8:], uint64(jsa.usage.store)) le.PutUint64(b[16:], uint64(jsa.usage.api)) le.PutUint64(b[24:], uint64(jsa.usage.err)) - if jsa.sendq != nil { - jsa.sendq <- &pubMsg{nil, jsa.updatesPub, _EMPTY_, nil, nil, b, noCompression, false, false} - } + + jsa.sendq <- &pubMsg{nil, jsa.updatesPub, _EMPTY_, nil, nil, b, noCompression, false, false} } func (js *jetStream) wouldExceedLimits(storeType StorageType, sz int) bool { @@ -1577,9 +1577,9 @@ func (js *jetStream) wouldExceedLimits(storeType StorageType, sz int) bool { max int64 ) if storeType == MemoryStorage { - total, max = &js.memTotal, js.config.MaxMemory + total, max = &js.memUsed, js.config.MaxMemory } else { - total, max = &js.storeTotal, js.config.MaxStore + total, max = &js.storeUsed, js.config.MaxStore } return atomic.LoadInt64(total) > (max + int64(sz)) } @@ -1605,9 +1605,19 @@ func (jsa *jsAccount) limitsExceeded(storeType StorageType) bool { return false } +// Check account limits. +func (jsa *jsAccount) checkAccountLimits(config *StreamConfig) error { + return jsa.checkLimits(config, false) +} + +// Check account and server limits. +func (jsa *jsAccount) checkAllLimits(config *StreamConfig) error { + return jsa.checkLimits(config, true) +} + // Check if a new proposed msg set while exceed our account limits. // Lock should be held. -func (jsa *jsAccount) checkLimits(config *StreamConfig) error { +func (jsa *jsAccount) checkLimits(config *StreamConfig, checkServer bool) error { if jsa.limits.MaxStreams > 0 && len(jsa.streams) >= jsa.limits.MaxStreams { return NewJSMaximumStreamsLimitError() } @@ -1617,13 +1627,13 @@ func (jsa *jsAccount) checkLimits(config *StreamConfig) error { } // Check storage, memory or disk. - return jsa.checkBytesLimits(config.MaxBytes, config.Storage, config.Replicas) + return jsa.checkBytesLimits(config.MaxBytes, config.Storage, config.Replicas, checkServer) } -// Check if additional bytes will exceed our account limits. +// Check if additional bytes will exceed our account limits and optionally the server itself. // This should account for replicas. // Lock should be held. -func (jsa *jsAccount) checkBytesLimits(addBytes int64, storage StorageType, replicas int) error { +func (jsa *jsAccount) checkBytesLimits(addBytes int64, storage StorageType, replicas int, checkServer bool) error { if replicas < 1 { replicas = 1 } @@ -1639,11 +1649,10 @@ func (jsa *jsAccount) checkBytesLimits(addBytes int64, storage StorageType, repl if jsa.memReserved+totalBytes > jsa.limits.MaxMemory { return NewJSMemoryResourcesExceededError() } - } else { - // Account is unlimited, check if this server can handle request. - if js.memReserved+addBytes > js.config.MaxMemory { - return NewJSMemoryResourcesExceededError() - } + } + // Check if this server can handle request. + if checkServer && js.memReserved+addBytes > js.config.MaxMemory { + return NewJSMemoryResourcesExceededError() } case FileStorage: // Account limits defined. @@ -1651,11 +1660,10 @@ func (jsa *jsAccount) checkBytesLimits(addBytes int64, storage StorageType, repl if jsa.storeReserved+totalBytes > jsa.limits.MaxStore { return NewJSStorageResourcesExceededError() } - } else { - // Account is unlimited, check if this server can handle request. - if js.storeReserved+addBytes > js.config.MaxStore { - return NewJSStorageResourcesExceededError() - } + } + // Check if this server can handle request. + if checkServer && js.storeReserved+addBytes > js.config.MaxStore { + return NewJSStorageResourcesExceededError() } } @@ -1721,57 +1729,96 @@ func (js *jetStream) usageStats() *JetStreamStats { stats.ReservedMemory = (uint64)(js.memReserved) stats.ReservedStore = (uint64)(js.storeReserved) js.mu.RUnlock() - stats.API.Total = (uint64)(atomic.LoadInt64(&js.apiCalls)) + stats.API.Total = (uint64)(atomic.LoadInt64(&js.apiTotal)) stats.API.Errors = (uint64)(atomic.LoadInt64(&js.apiErrors)) - stats.Memory = (uint64)(atomic.LoadInt64(&js.memTotal)) - stats.Store = (uint64)(atomic.LoadInt64(&js.storeTotal)) - stats.ReservedMemoryUsed = (uint64)(atomic.LoadInt64(&js.memTotalRes)) - stats.ReserveStoreUsed = (uint64)(atomic.LoadInt64(&js.storeTotalRes)) + stats.API.Inflight = (uint64)(atomic.LoadInt64(&js.apiInflight)) + stats.Memory = (uint64)(atomic.LoadInt64(&js.memUsed)) + stats.Store = (uint64)(atomic.LoadInt64(&js.storeUsed)) return &stats } // Check to see if we have enough system resources for this account. // Lock should be held. func (js *jetStream) sufficientResources(limits *JetStreamAccountLimits) error { - if limits == nil { + // If we are clustered we do not really know how many resources will be ultimately available. + // This needs to be handled out of band. + // If we are a single server, we can make decisions here. + if limits == nil || !js.standAlone { return nil } + + // Reserved is now specific to the MaxBytes for streams. if js.memReserved+limits.MaxMemory > js.config.MaxMemory { return NewJSMemoryResourcesExceededError() } if js.storeReserved+limits.MaxStore > js.config.MaxStore { return NewJSStorageResourcesExceededError() } + + // Since we know if we are here we are single server mode, check the account reservations. + var storeReserved, memReserved int64 + for _, jsa := range js.accounts { + jsa.mu.RLock() + if jsa.limits.MaxMemory > 0 { + memReserved += jsa.limits.MaxMemory + } + if jsa.limits.MaxStore > 0 { + storeReserved += jsa.limits.MaxStore + } + jsa.mu.RUnlock() + } + + if memReserved+limits.MaxMemory > js.config.MaxMemory { + return NewJSMemoryResourcesExceededError() + } + if storeReserved+limits.MaxStore > js.config.MaxStore { + return NewJSStorageResourcesExceededError() + } + return nil } -// This will (blindly) reserve the respources requested. -// Lock should be held. -func (js *jetStream) reserveResources(limits *JetStreamAccountLimits) error { - if limits == nil { - return nil +// This will reserve the stream resources requested. +// This will spin off off of MaxBytes. +func (js *jetStream) reserveStreamResources(cfg *StreamConfig) { + if cfg == nil || cfg.MaxBytes <= 0 { + return } - if limits.MaxMemory > 0 { - js.memReserved += limits.MaxMemory + + js.mu.Lock() + switch cfg.Storage { + case MemoryStorage: + js.memReserved += cfg.MaxBytes + case FileStorage: + js.storeReserved += cfg.MaxBytes } - if limits.MaxStore > 0 { - js.storeReserved += limits.MaxStore + s, clustered := js.srv, !js.standAlone + js.mu.Unlock() + // If clustered send an update to the system immediately. + if clustered { + s.sendStatszUpdate() } - return nil } -// Lock should be held. -func (js *jetStream) releaseResources(limits *JetStreamAccountLimits) error { - if limits == nil { - return nil +// Release reserved resources held by a stream. +func (js *jetStream) releaseStreamResources(cfg *StreamConfig) { + if cfg == nil || cfg.MaxBytes <= 0 { + return } - if limits.MaxMemory > 0 { - js.memReserved -= limits.MaxMemory + + js.mu.Lock() + switch cfg.Storage { + case MemoryStorage: + js.memReserved -= cfg.MaxBytes + case FileStorage: + js.storeReserved -= cfg.MaxBytes } - if limits.MaxStore > 0 { - js.storeReserved -= limits.MaxStore + s, clustered := js.srv, !js.standAlone + js.mu.Unlock() + // If clustered send an update to the system immediately. + if clustered { + s.sendStatszUpdate() } - return nil } const ( diff --git a/server/jetstream_api.go b/server/jetstream_api.go index b6d4e4a4..a7fc2fd4 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1,4 +1,4 @@ -// Copyright 2020-2021 The NATS Authors +// Copyright 2020-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -652,8 +652,8 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub // If we are here we have received this request over a non client connection. // We need to make sure not to block. We will spin a Go routine per but also make // sure we do not have too many outstanding. - if apiOut := atomic.AddInt64(&js.apiCalls, 1); apiOut > maxJSApiOut { - atomic.AddInt64(&js.apiCalls, -1) + if apiOut := atomic.AddInt64(&js.apiInflight, 1); apiOut > maxJSApiOut { + atomic.AddInt64(&js.apiInflight, -1) ci, acc, _, msg, err := s.getRequestInfo(c, rmsg) if err == nil { resp := &ApiResponse{Type: JSApiOverloadedType, Error: NewJSInsufficientResourcesError()} @@ -678,7 +678,7 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub // Dispatch the API call to its own Go routine. go func() { jsub.icb(sub, client, acc, subject, reply, rmsg) - atomic.AddInt64(&js.apiCalls, -1) + atomic.AddInt64(&js.apiInflight, -1) }() } @@ -817,7 +817,9 @@ func (a *Account) trackAPI() { jsa.usage.api++ jsa.apiTotal++ jsa.sendClusterUsageUpdate() + js := jsa.js jsa.mu.Unlock() + atomic.AddInt64(&js.apiTotal, 1) } } @@ -834,6 +836,7 @@ func (a *Account) trackAPIErr() { jsa.sendClusterUsageUpdate() js := jsa.js jsa.mu.Unlock() + atomic.AddInt64(&js.apiTotal, 1) atomic.AddInt64(&js.apiErrors, 1) } } @@ -1113,7 +1116,7 @@ func (s *Server) jsonResponse(v interface{}) string { } // Request to create a stream. -func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, a *Account, subject, reply string, rmsg []byte) { +func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { if c == nil || !s.JetStreamEnabled() { return } @@ -1286,6 +1289,14 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, a *Account, } } + // Check for MaxBytes required. + if acc.maxBytesRequired() && cfg.MaxBytes <= 0 { + resp.Error = NewJSStreamMaxBytesRequiredError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + // Hand off to cluster for processing. if s.JetStreamIsClustered() { s.jsClusteredStreamRequest(ci, acc, subject, reply, rmsg, &cfg) return diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index c3e9b9d5..ef5f36ee 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1,4 +1,4 @@ -// Copyright 2020-2021 The NATS Authors +// Copyright 2020-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -831,6 +831,10 @@ func (js *jetStream) monitorCluster() { } } case isLeader = <-lch: + // We want to make sure we are updated on statsz so ping the extended cluster. + if isLeader { + s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil) + } js.processLeaderChange(isLeader) if isLeader && !beenLeader { beenLeader = true @@ -2167,6 +2171,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool { js.mu.Lock() if node := sa.Group.node; node != nil { node.ProposeRemovePeer(ourID) + didRemove = true } sa.Group.node = nil sa.err = nil @@ -3354,6 +3359,11 @@ type streamAssignmentResult struct { Update bool `json:"is_update,omitempty"` } +// Determine if this is an insufficient resources error type. +func isInsufficientResourcesErr(err *ApiError) bool { + return err != nil && IsNatsErr(err, JSInsufficientResourcesErr, JSMemoryResourcesExceededErr, JSStorageResourcesExceededErr) +} + // Process error results of stream and consumer assignments. // Success will be handled by stream leader. func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { @@ -3375,6 +3385,30 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client // FIXME(dlc) - suppress duplicates? if sa := js.streamAssignment(result.Account, result.Stream); sa != nil { + canDelete := !result.Update && time.Since(sa.Created) < 5*time.Second + + // See if we should retry in case this cluster is full but there are others. + if cfg, ci := sa.Config, sa.Client; cfg != nil && ci != nil && isInsufficientResourcesErr(result.Response.Error) && canDelete { + // If cluster is defined we can not retry. + if cfg.Placement == nil || cfg.Placement.Cluster == _EMPTY_ { + // If we have additional clusters to try we can retry. + if ci != nil && len(ci.Alternates) > 0 { + if rg := js.createGroupForStream(ci, cfg); rg != nil { + s.Warnf("Retrying cluster placement for stream '%s > %s'", result.Account, result.Stream) + // Pick a new preferred leader. + rg.setPreferred() + // Get rid of previous attempt. + cc.meta.Propose(encodeDeleteStreamAssignment(sa)) + // Propose new. + sa.Group, sa.err = rg, nil + cc.meta.Propose(encodeAddStreamAssignment(sa)) + return + } + } + } + } + + // Respond to the user here. var resp string if result.Response != nil { resp = s.jsonResponse(result.Response) @@ -3385,11 +3419,8 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client sa.responded = true js.srv.sendAPIErrResponse(sa.Client, acc, sa.Subject, sa.Reply, _EMPTY_, resp) } - // Here we will remove this assignment, so this needs to only execute when we are sure - // this is what we want to do. - // TODO(dlc) - Could have mixed results, should track per peer. - // Set sa.err while we are deleting so we will not respond to list/names requests. - if !result.Update && time.Since(sa.Created) < 5*time.Second { + // Remove this assignment if possible. + if canDelete { sa.err = NewJSClusterNotAssignedError() cc.meta.Propose(encodeDeleteStreamAssignment(sa)) } @@ -3562,31 +3593,77 @@ func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePe } // selectPeerGroup will select a group of peers to start a raft group. -// TODO(dlc) - For now randomly select. Can be way smarter. -func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string) []string { - var nodes []string - peers := cc.meta.Peers() - s := cc.s +func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig) []string { + if cluster == _EMPTY_ || cfg == nil { + return nil + } + + var maxBytes uint64 + if cfg != nil && cfg.MaxBytes > 0 { + maxBytes = uint64(cfg.MaxBytes) + } + + // Used for weighted sorting based on availability. + type wn struct { + id string + avail uint64 + } + + var nodes []wn + s, peers := cc.s, cc.meta.Peers() for _, p := range peers { - // If we know its offline or it is not in our list it probably shutdown, so don't consider. - if si, ok := s.nodeToInfo.Load(p.ID); !ok || si.(nodeInfo).offline { + si, ok := s.nodeToInfo.Load(p.ID) + if !ok || si == nil { continue } - if cluster != _EMPTY_ { - if s.clusterNameForNode(p.ID) == cluster { - nodes = append(nodes, p.ID) - } - } else { - nodes = append(nodes, p.ID) + ni := si.(nodeInfo) + // Only select from the designated named cluster. + // If we know its offline or we do not have config or stats don't consider. + if ni.cluster != cluster || ni.offline || ni.cfg == nil || ni.stats == nil { + continue } + + var available uint64 + switch cfg.Storage { + case MemoryStorage: + used := ni.stats.ReservedMemory + if ni.stats.Memory > used { + used = ni.stats.Memory + } + if ni.cfg.MaxMemory > int64(used) { + available = uint64(ni.cfg.MaxMemory) - used + } + case FileStorage: + used := ni.stats.ReservedStore + if ni.stats.Store > used { + used = ni.stats.Store + } + if ni.cfg.MaxStore > int64(used) { + available = uint64(ni.cfg.MaxStore) - used + } + } + + // Check if we have enveough room if maxBytes set. + if maxBytes > 0 && maxBytes > available { + continue + } + // Add to our list of potential nodes. + nodes = append(nodes, wn{p.ID, available}) } + + // If we could not select enough peers, fail. if len(nodes) < r { return nil } - // Don't depend on range to randomize. - rand.Shuffle(len(nodes), func(i, j int) { nodes[i], nodes[j] = nodes[j], nodes[i] }) - return nodes[:r] + // Sort based on available from most to least. + sort.Slice(nodes, func(i, j int) bool { return nodes[i].avail > nodes[j].avail }) + + var results []string + for _, r := range nodes[:r] { + results = append(results, r.id) + } + return results } func groupNameForStream(peers []string, storage StorageType) string { @@ -3609,22 +3686,33 @@ func groupName(prefix string, peers []string, storage StorageType) string { // createGroupForStream will create a group for assignment for the stream. // Lock should be held. -func (cc *jetStreamCluster) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) *raftGroup { +func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) *raftGroup { replicas := cfg.Replicas if replicas == 0 { replicas = 1 } - cluster := ci.Cluster - if cfg.Placement != nil && cfg.Placement.Cluster != _EMPTY_ { + + // Default connected cluster from the request origin. + cc, cluster := js.cluster, ci.Cluster + // If specified, override the default. + clusterDefined := cfg.Placement != nil && cfg.Placement.Cluster != _EMPTY_ + if clusterDefined { cluster = cfg.Placement.Cluster } - // Need to create a group here. - // TODO(dlc) - Can be way smarter here. - peers := cc.selectPeerGroup(replicas, cluster) - if len(peers) == 0 { - return nil + clusters := []string{cluster} + if !clusterDefined { + clusters = append(clusters, ci.Alternates...) } - return &raftGroup{Name: groupNameForStream(peers, cfg.Storage), Storage: cfg.Storage, Peers: peers} + + // Need to create a group here. + for _, cn := range clusters { + peers := cc.selectPeerGroup(replicas, cn, cfg) + if len(peers) < replicas { + continue + } + return &raftGroup{Name: groupNameForStream(peers, cfg.Storage), Storage: cfg.Storage, Peers: peers} + } + return nil } func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, config *StreamConfig) { @@ -3670,8 +3758,8 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, return } - // Check for stream limits here before proposing. - if err := jsa.checkLimits(cfg); err != nil { + // Check for account limits here before proposing. + if err := jsa.checkAccountLimits(cfg); err != nil { resp.Error = NewJSStreamLimitsError(err, Unless(err)) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return @@ -3681,6 +3769,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, js.mu.Lock() defer js.mu.Unlock() + // If this stream already exists, turn this into a stream info call. if sa := js.streamAssignment(acc.Name, cfg.Name); sa != nil { // If they are the same then we will forward on as a stream info request. // This now matches single server behavior. @@ -3700,8 +3789,9 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, resp.Error = NewJSStreamNameExistError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return + } - } else if cfg.Sealed { + if cfg.Sealed { resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for create can not be sealed")) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return @@ -3721,7 +3811,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, } // Raft group selection and placement. - rg := cc.createGroupForStream(ci, cfg) + rg := js.createGroupForStream(ci, cfg) if rg == nil { resp.Error = NewJSInsufficientResourcesError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) @@ -3873,7 +3963,12 @@ func (s *Server) jsClusteredStreamPurgeRequest( s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp)) } -func (s *Server) jsClusteredStreamRestoreRequest(ci *ClientInfo, acc *Account, req *JSApiStreamRestoreRequest, stream, subject, reply string, rmsg []byte) { +func (s *Server) jsClusteredStreamRestoreRequest( + ci *ClientInfo, + acc *Account, + req *JSApiStreamRestoreRequest, + stream, subject, reply string, rmsg []byte) { + js, cc := s.getJetStreamCluster() if js == nil || cc == nil { return @@ -3892,7 +3987,7 @@ func (s *Server) jsClusteredStreamRestoreRequest(ci *ClientInfo, acc *Account, r } // Raft group selection and placement. - rg := cc.createGroupForStream(ci, cfg) + rg := js.createGroupForStream(ci, cfg) if rg == nil { resp.Error = NewJSInsufficientResourcesError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index cfe2f16a..8b5453da 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -1,4 +1,4 @@ -// Copyright 2020-2021 The NATS Authors +// Copyright 2020-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -143,9 +143,7 @@ func TestJetStreamClusterStreamLimitWithAccountDefaults(t *testing.T) { Replicas: 2, MaxBytes: 15 * 1024 * 1024, }) - if err == nil || !strings.Contains(err.Error(), "insufficient storage") { - t.Fatalf("Expected %v but got %v", ApiErrors[JSStorageResourcesExceededErr], err) - } + require_Error(t, err, NewJSInsufficientResourcesError(), NewJSStorageResourcesExceededError()) } func TestJetStreamClusterSingleReplicaStreams(t *testing.T) { @@ -966,20 +964,17 @@ func TestJetStreamClusterMaxBytesForStream(t *testing.T) { // Stream config. cfg := &nats.StreamConfig{ Name: "TEST", - Subjects: []string{"foo", "bar"}, Replicas: 2, + MaxBytes: 2 * 1024 * 1024 * 1024, // 2GB } - // 2GB - cfg.MaxBytes = 2 * 1024 * 1024 * 1024 - if _, err := js.AddStream(cfg); err != nil { - t.Fatalf("Unexpected error: %v", err) - } + _, err = js.AddStream(cfg) + require_NoError(t, err) + // Make sure going over the single server limit though is enforced (for now). + cfg.Name = "TEST2" cfg.MaxBytes *= 2 _, err = js.AddStream(cfg) - if err == nil || !strings.Contains(err.Error(), "insufficient storage resources") { - t.Fatalf("Expected %q error, got %q", "insufficient storage resources", err.Error()) - } + require_Error(t, err, NewJSInsufficientResourcesError(), NewJSStorageResourcesExceededError()) } func TestJetStreamClusterStreamPublishWithActiveConsumers(t *testing.T) { @@ -9764,6 +9759,146 @@ func TestJetStreamSuperClusterPushConsumerInterest(t *testing.T) { } } +func TestJetStreamClusterOverflowPlacement(t *testing.T) { + sc := createJetStreamSuperClusterWithTemplate(t, jsClusterMaxBytesTempl, 3, 3) + defer sc.shutdown() + + pcn := "C2" + s := sc.clusterForName(pcn).randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + // With this setup, we opted in for requiring MaxBytes, so this should error. + _, err := js.AddStream(&nats.StreamConfig{ + Name: "foo", + Replicas: 3, + }) + require_Error(t, err, NewJSStreamMaxBytesRequiredError()) + + // R=2 on purpose to leave one server empty. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "foo", + Replicas: 2, + MaxBytes: 2 * 1024 * 1024 * 1024, + }) + require_NoError(t, err) + + // Now try to add another that will overflow the current cluster's reservation. + // Since we asked explicitly for the same cluster this should fail. + // Note this will not be testing the peer picker since the update has probably not made it to the meta leader. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "bar", + Replicas: 3, + MaxBytes: 2 * 1024 * 1024 * 1024, + Placement: &nats.Placement{Cluster: pcn}, + }) + require_Error(t, err, NewJSInsufficientResourcesError(), NewJSStorageResourcesExceededError()) + + // Now test actual overflow placement. So try again with no placement designation. + // This will test the peer picker's logic since they are updated at this point and the meta leader + // knows it can not place it in C2. + si, err := js.AddStream(&nats.StreamConfig{ + Name: "bar", + Replicas: 3, + MaxBytes: 2 * 1024 * 1024 * 1024, + }) + require_NoError(t, err) + + // Make sure we did not get place into C2. + falt := si.Cluster.Name + if falt == pcn { + t.Fatalf("Expected to be placed in another cluster besides %q, but got %q", pcn, falt) + } + + // One more time that should spill over again to our last cluster. + si, err = js.AddStream(&nats.StreamConfig{ + Name: "baz", + Replicas: 3, + MaxBytes: 2 * 1024 * 1024 * 1024, + }) + require_NoError(t, err) + + // Make sure we did not get place into C2. + if salt := si.Cluster.Name; salt == pcn || salt == falt { + t.Fatalf("Expected to be placed in last cluster besides %q or %q, but got %q", pcn, falt, salt) + } + + // Now place a stream of R1 into C2 which should have space. + si, err = js.AddStream(&nats.StreamConfig{ + Name: "dlc", + MaxBytes: 2 * 1024 * 1024 * 1024, + }) + require_NoError(t, err) + + if si.Cluster.Name != pcn { + t.Fatalf("Expected to be placed in our origin cluster %q, but got %q", pcn, si.Cluster.Name) + } +} + +func TestJetStreamClusterConcurrentOverflow(t *testing.T) { + sc := createJetStreamSuperClusterWithTemplate(t, jsClusterMaxBytesTempl, 3, 3) + defer sc.shutdown() + + pcn := "C2" + + startCh := make(chan bool) + var wg sync.WaitGroup + var swg sync.WaitGroup + + start := func(name string) { + wg.Add(1) + defer wg.Done() + + s := sc.clusterForName(pcn).randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + swg.Done() + <-startCh + + _, err := js.AddStream(&nats.StreamConfig{ + Name: name, + Replicas: 3, + MaxBytes: 2 * 1024 * 1024 * 1024, + }) + require_NoError(t, err) + } + + swg.Add(2) + go start("foo") + go start("bar") + swg.Wait() + // Now start both at same time. + close(startCh) + wg.Wait() +} + +func TestJetStreamClusterBalancedPlacement(t *testing.T) { + c := createJetStreamClusterWithTemplate(t, jsClusterMaxBytesTempl, "CB", 5) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // We have 10GB (2GB X 5) available. + // Use MaxBytes for ease of test (used works too) and place 5 1GB streams with R=2. + for i := 1; i <= 5; i++ { + _, err := js.AddStream(&nats.StreamConfig{ + Name: fmt.Sprintf("S-%d", i), + Replicas: 2, + MaxBytes: 1 * 1024 * 1024 * 1024, + }) + require_NoError(t, err) + } + // Make sure the next one fails properly. + _, err := js.AddStream(&nats.StreamConfig{ + Name: "FAIL", + Replicas: 2, + MaxBytes: 1 * 1024 * 1024 * 1024, + }) + require_Error(t, err, NewJSInsufficientResourcesError(), NewJSStorageResourcesExceededError()) +} + // Support functions // Used to setup superclusters for tests. @@ -9829,6 +9964,36 @@ var jsClusterTempl = ` accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } ` +var jsClusterMaxBytesTempl = ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + + leaf { + listen: 127.0.0.1:-1 + } + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + no_auth_user: u + + accounts { + $U { + users = [ { user: "u", pass: "p" } ] + jetstream: { + max_mem: 128MB + max_file: 8GB + max_bytes: true // Forces streams to indicate max_bytes. + } + } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } +` + var jsSuperClusterTempl = ` %s gateway { @@ -10733,10 +10898,10 @@ func (c *cluster) waitOnLeader() { expires := time.Now().Add(40 * time.Second) for time.Now().Before(expires) { if leader := c.leader(); leader != nil { - time.Sleep(100 * time.Millisecond) + time.Sleep(250 * time.Millisecond) return } - time.Sleep(25 * time.Millisecond) + time.Sleep(10 * time.Millisecond) } c.t.Fatalf("Expected a cluster leader, got none") @@ -10751,7 +10916,7 @@ func (c *cluster) waitOnClusterReady() { func (c *cluster) waitOnClusterReadyWithNumPeers(numPeersExpected int) { c.t.Helper() var leader *Server - expires := time.Now().Add(20 * time.Second) + expires := time.Now().Add(40 * time.Second) for time.Now().Before(expires) { if leader = c.leader(); leader != nil { break @@ -10761,10 +10926,10 @@ func (c *cluster) waitOnClusterReadyWithNumPeers(numPeersExpected int) { // Now make sure we have all peers. for leader != nil && time.Now().Before(expires) { if len(leader.JetStreamClusterPeers()) == numPeersExpected { - time.Sleep(100 * time.Millisecond) + time.Sleep(250 * time.Millisecond) return } - time.Sleep(50 * time.Millisecond) + time.Sleep(10 * time.Millisecond) } peersSeen := len(leader.JetStreamClusterPeers()) diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index 9eaf94a5..2b105f28 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -263,6 +263,9 @@ const ( // JSStreamLimitsErrF General stream limits exceeded error string ({err}) JSStreamLimitsErrF ErrorIdentifier = 10053 + // JSStreamMaxBytesRequired account requires a stream config to have max bytes set + JSStreamMaxBytesRequired ErrorIdentifier = 10113 + // JSStreamMessageExceedsMaximumErr message size exceeds maximum allowed JSStreamMessageExceedsMaximumErr ErrorIdentifier = 10054 @@ -427,6 +430,7 @@ var ( JSStreamInvalidErr: {Code: 500, ErrCode: 10096, Description: "stream not valid"}, JSStreamInvalidExternalDeliverySubjErrF: {Code: 400, ErrCode: 10024, Description: "stream external delivery prefix {prefix} must not contain wildcards"}, JSStreamLimitsErrF: {Code: 500, ErrCode: 10053, Description: "{err}"}, + JSStreamMaxBytesRequired: {Code: 400, ErrCode: 10113, Description: "account requires a stream config to have max bytes set"}, JSStreamMessageExceedsMaximumErr: {Code: 400, ErrCode: 10054, Description: "message size exceeds maximum allowed"}, JSStreamMirrorNotUpdatableErr: {Code: 400, ErrCode: 10055, Description: "Mirror configuration can not be updated"}, JSStreamMismatchErr: {Code: 400, ErrCode: 10056, Description: "stream name in subject does not match request"}, @@ -1457,6 +1461,16 @@ func NewJSStreamLimitsError(err error, opts ...ErrorOption) *ApiError { } } +// NewJSStreamMaxBytesRequiredError creates a new JSStreamMaxBytesRequired error: "account requires a stream config to have max bytes set" +func NewJSStreamMaxBytesRequiredError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSStreamMaxBytesRequired] +} + // NewJSStreamMessageExceedsMaximumError creates a new JSStreamMessageExceedsMaximumErr error: "message size exceeds maximum allowed" func NewJSStreamMessageExceedsMaximumError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/monitor.go b/server/monitor.go index bd66c3cf..1f8c484b 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1,4 +1,4 @@ -// Copyright 2013-2019 The NATS Authors +// Copyright 2013-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -2417,11 +2417,10 @@ type JSInfo struct { Disabled bool `json:"disabled,omitempty"` Config JetStreamConfig `json:"config,omitempty"` JetStreamStats - APICalls int64 `json:"current_api_calls"` - Streams int `json:"total_streams,omitempty"` - Consumers int `json:"total_consumers,omitempty"` - Messages uint64 `json:"total_messages,omitempty"` - Bytes uint64 `json:"total_message_bytes,omitempty"` + Streams int `json:"streams"` + Consumers int `json:"consumers"` + Messages uint64 `json:"messages"` + Bytes uint64 `json:"bytes"` Meta *MetaClusterInfo `json:"meta_cluster,omitempty"` // aggregate raft info @@ -2576,7 +2575,6 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) { accounts = append(accounts, info) } s.js.mu.RUnlock() - jsi.APICalls = atomic.LoadInt64(&s.js.apiCalls) if mg := s.js.getMetaGroup(); mg != nil { if ci := s.raftNodeToClusterInfo(mg); ci != nil { diff --git a/server/monitor_test.go b/server/monitor_test.go index 9b0fb508..c4b75903 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -1,4 +1,4 @@ -// Copyright 2013-2020 The NATS Authors +// Copyright 2013-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -4012,6 +4012,7 @@ func TestMonitorJsz(t *testing.T) { } ACC { users [{user: usr, password: pwd}] + // In clustered mode, these reservations will not impact any one server. jetstream: {max_store: 4Mb, max_memory: 5Mb} } BCC_TO_HAVE_ONE_EXTRA { @@ -4087,12 +4088,6 @@ func TestMonitorJsz(t *testing.T) { if info.Messages != 1 { t.Fatalf("expected one message but got %d", info.Messages) } - if info.ReservedStore != 4*1024*1024 { - t.Fatalf("expected 4Mb reserved, got %d bytes", info.ReservedStore) - } - if info.ReservedMemory != 5*1024*1024 { - t.Fatalf("expected 5Mb reserved, got %d bytes", info.ReservedStore) - } } }) t.Run("accounts", func(t *testing.T) { @@ -4207,110 +4202,6 @@ func TestMonitorJsz(t *testing.T) { }) } -func TestMonitorJszAccountReserves(t *testing.T) { - readJsInfo := func(url string) *JSInfo { - t.Helper() - body := readBody(t, url) - info := &JSInfo{} - err := json.Unmarshal(body, info) - require_NoError(t, err) - return info - } - tmpDir := createDir(t, "srv") - defer removeDir(t, tmpDir) - tmplCfg := ` - listen: 127.0.0.1:-1 - http_port: 7501 - system_account: SYS - jetstream: { - store_dir: %s - } - accounts { - SYS { users [{user: sys, password: pwd}] } - ACC { - users [{user: usr, password: pwd}] - %s - } - }` - - conf := createConfFile(t, []byte(fmt.Sprintf(tmplCfg, tmpDir, "jetstream: enabled"))) - defer removeFile(t, conf) - s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - checkForJSClusterUp(t, s) - - nc := natsConnect(t, fmt.Sprintf("nats://usr:pwd@127.0.0.1:%d", s.opts.Port)) - defer nc.Close() - js, err := nc.JetStream(nats.MaxWait(5 * time.Second)) - require_NoError(t, err) - for _, v := range []struct { - subject string - storage nats.StorageType - }{ - {"file", nats.FileStorage}, - {"mem", nats.MemoryStorage}} { - _, err = js.AddStream(&nats.StreamConfig{ - Name: v.subject, - Subjects: []string{v.subject}, - Replicas: 1, - Storage: v.storage, - }) - require_NoError(t, err) - require_NoError(t, nc.Flush()) - } - - send := func() { - for _, subj := range []string{"file", "mem"} { - _, err = js.Publish(subj, []byte("hello world "+subj)) - require_NoError(t, err) - } - require_NoError(t, nc.Flush()) - } - - test := func(msgs, reservedMemory, reservedStore uint64, totalIsReserveUsd bool) { - t.Helper() - info := readJsInfo(fmt.Sprintf("http://127.0.0.1:%d/jsz", s.opts.HTTPPort)) - if info.Streams != 2 { - t.Fatalf("expected stream count to be 1 but got %d", info.Streams) - } - if info.Messages != msgs { - t.Fatalf("expected one message but got %d", info.Messages) - } - if info.ReservedStore != reservedStore { - t.Fatalf("expected %d bytes reserved, got %d bytes", reservedStore, info.ReservedStore) - } - if info.ReservedMemory != reservedMemory { - t.Fatalf("expected %d bytes reserved, got %d bytes", reservedMemory, info.ReservedStore) - } - if info.Memory == 0 { - t.Fatalf("memory expected to be not 0") - } - if info.Store == 0 { - t.Fatalf("store expected to be not 0") - } - memory, store := uint64(0), uint64(0) - if totalIsReserveUsd { - memory = info.Memory - store = info.Store - } - if info.ReservedMemoryUsed != memory { - t.Fatalf("expected %d bytes reserved memory used, got %d bytes", memory, info.ReservedMemoryUsed) - } - if info.ReserveStoreUsed != store { - t.Fatalf("expected %d bytes reserved store used, got %d bytes", store, info.ReserveStoreUsed) - } - } - - send() - test(2, 0, 0, false) - reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmplCfg, tmpDir, "jetstream: {max_mem: 4Mb, max_store: 5Mb}")) - test(2, 4*1024*1024, 5*1024*1024, true) - send() - test(4, 4*1024*1024, 5*1024*1024, true) - reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmplCfg, tmpDir, "jetstream: enabled")) - test(4, 0, 0, false) -} - func TestMonitorReloadTLSConfig(t *testing.T) { template := ` listen: "127.0.0.1:-1" diff --git a/server/mqtt_test.go b/server/mqtt_test.go index e39cf0af..4a9096b0 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -3164,6 +3164,8 @@ func TestMQTTLeafnodeWithoutJSToClusterWithJSNoSharedSysAcc(t *testing.T) { checkFor(t, 10*time.Second, 50*time.Millisecond, func() error { for _, s := range cluster { if s.JetStreamIsLeader() { + // Need to wait for usage updates now to propagate to meta leader. + time.Sleep(250 * time.Millisecond) return nil } } @@ -3180,13 +3182,9 @@ func TestMQTTLeafnodeWithoutJSToClusterWithJSNoSharedSysAcc(t *testing.T) { lno.Accounts = append(lno.Accounts, NewAccount("unused-account")) fallthrough case 1: - lno.JsAccDefaultDomain = map[string]string{ - "$G": "", - } + lno.JsAccDefaultDomain = map[string]string{"$G": ""} case 2: - lno.JsAccDefaultDomain = map[string]string{ - "$G": o1.JetStreamDomain, - } + lno.JsAccDefaultDomain = map[string]string{"$G": o1.JetStreamDomain} case 3: // turn off jetstream in $G by adding another account and set mqtt domain option lno.Accounts = append(lno.Accounts, NewAccount("unused-account")) diff --git a/server/opts.go b/server/opts.go index 7e05afe2..044ce294 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1,4 +1,4 @@ -// Copyright 2012-2021 The NATS Authors +// Copyright 2012-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -1601,7 +1601,7 @@ func parseGateway(v interface{}, o *Options, errors *[]error, warnings *[]error) return nil } -var dynamicJSAccountLimits = &JetStreamAccountLimits{-1, -1, -1, -1} +var dynamicJSAccountLimits = &JetStreamAccountLimits{-1, -1, -1, -1, false} // Parses jetstream account limits for an account. Simple setup with boolen is allowed, and we will // use dynamic account limits. @@ -1626,7 +1626,7 @@ func parseJetStreamForAccount(v interface{}, acc *Account, errors *[]error, warn return &configErr{tk, fmt.Sprintf("Expected 'enabled' or 'disabled' for string value, got '%s'", vv)} } case map[string]interface{}: - jsLimits := &JetStreamAccountLimits{-1, -1, -1, -1} + jsLimits := &JetStreamAccountLimits{-1, -1, -1, -1, false} for mk, mv := range vv { tk, mv = unwrapValue(mv, <) switch strings.ToLower(mk) { @@ -1654,6 +1654,12 @@ func parseJetStreamForAccount(v interface{}, acc *Account, errors *[]error, warn return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} } jsLimits.MaxConsumers = int(vv) + case "max_bytes_required", "max_stream_bytes", "max_bytes": + vv, ok := mv.(bool) + if !ok { + return &configErr{tk, fmt.Sprintf("Expected a parseable bool for %q, got %v", mk, mv)} + } + jsLimits.MaxBytesRequired = bool(vv) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/route.go b/server/route.go index 652c0c09..2942912d 100644 --- a/server/route.go +++ b/server/route.go @@ -1,4 +1,4 @@ -// Copyright 2013-2020 The NATS Authors +// Copyright 2013-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -1422,7 +1422,7 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) { s.remotes[id] = c // check to be consistent and future proof. but will be same domain if s.sameDomain(info.Domain) { - s.nodeToInfo.Store(c.route.hash, nodeInfo{c.route.remoteName, s.info.Cluster, info.Domain, id, false, info.JetStream}) + s.nodeToInfo.Store(c.route.hash, nodeInfo{c.route.remoteName, s.info.Cluster, info.Domain, id, nil, nil, false, info.JetStream}) } c.mu.Lock() c.route.connectURLs = info.ClientConnectURLs diff --git a/server/server.go b/server/server.go index 8b5e224c..6e3ded5e 100644 --- a/server/server.go +++ b/server/server.go @@ -1,4 +1,4 @@ -// Copyright 2012-2021 The NATS Authors +// Copyright 2012-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -274,6 +274,8 @@ type nodeInfo struct { cluster string domain string id string + cfg *JetStreamConfig + stats *JetStreamStats offline bool js bool } @@ -384,9 +386,19 @@ func NewServer(opts *Options) (*Server, error) { s.mu.Lock() defer s.mu.Unlock() - // Place ourselves in some lookup maps. - ourNode := string(getHash(serverName)) - s.nodeToInfo.Store(ourNode, nodeInfo{serverName, opts.Cluster.Name, opts.JetStreamDomain, info.ID, false, opts.JetStream}) + // Place ourselves in the JetStream nodeInfo if needed. + if opts.JetStream { + ourNode := string(getHash(serverName)) + s.nodeToInfo.Store(ourNode, nodeInfo{ + serverName, + opts.Cluster.Name, + opts.JetStreamDomain, + info.ID, + &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore}, + nil, + false, true, + }) + } s.routeResolver = opts.Cluster.resolver if s.routeResolver == nil { @@ -560,6 +572,11 @@ func (s *Server) isClusterNameDynamic() bool { return s.getOpts().Cluster.Name == _EMPTY_ } +// Returns our configured serverName. +func (s *Server) serverName() string { + return s.getOpts().ServerName +} + // ClientURL returns the URL used to connect clients. Helpful in testing // when we designate a random client port (-1). func (s *Server) ClientURL() string { diff --git a/server/stream.go b/server/stream.go index aeebb423..a1cc58ec 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1,4 +1,4 @@ -// Copyright 2019-2021 The NATS Authors +// Copyright 2019-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -314,8 +314,8 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt return nil, ApiErrors[JSStreamNameExistErr] } } - // Check for limits. - if err := jsa.checkLimits(&cfg); err != nil { + // Check for account and server limits. + if err := jsa.checkAllLimits(&cfg); err != nil { jsa.mu.Unlock() return nil, err } @@ -442,6 +442,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt // Setup our internal send go routine. mset.setupSendCapabilities() + // Reserve resources if MaxBytes present. + mset.js.reserveStreamResources(&mset.cfg) + // Call directly to set leader if not in clustered mode. // This can be called though before we actually setup clustering, so check both. if singleServerMode { @@ -965,7 +968,7 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig) (*StreamConfig, } // Check limits. - if err := jsa.checkLimits(&cfg); err != nil { + if err := jsa.checkAllLimits(&cfg); err != nil { return nil, err } return &cfg, nil @@ -3319,7 +3322,7 @@ func (mset *stream) delete() error { // Internal function to stop or delete the stream. func (mset *stream) stop(deleteFlag, advisory bool) error { mset.mu.RLock() - jsa := mset.jsa + js, jsa := mset.js, mset.jsa mset.mu.RUnlock() if jsa == nil { @@ -3427,6 +3430,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { if err := mset.store.Delete(); err != nil { return err } + js.releaseStreamResources(&mset.cfg) } else if err := mset.store.Stop(); err != nil { return err }