From 52da55c8c68d5e3dd00e1fc24896cad04bcc8250 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 6 Jan 2022 12:59:39 -0800 Subject: [PATCH] Implement overflow placement for JetStream streams. This allows stream placement to overflow to adjacent clusters. We also do more balanced placement based on resources (store or mem). We can continue to expand this as well. We also introduce an account requirement that stream configs contain a MaxBytes value. We now track account limits and server limits more distinctly, and do not reserver server resources based on account limits themselves. Signed-off-by: Derek Collison --- server/client.go | 55 ++++-- server/consumer.go | 8 +- server/errors.json | 12 +- server/events.go | 95 +++++++--- server/jetstream.go | 263 ++++++++++++++++----------- server/jetstream_api.go | 21 ++- server/jetstream_cluster.go | 169 +++++++++++++---- server/jetstream_cluster_test.go | 201 ++++++++++++++++++-- server/jetstream_errors_generated.go | 14 ++ server/monitor.go | 12 +- server/monitor_test.go | 113 +----------- server/mqtt_test.go | 10 +- server/opts.go | 12 +- server/route.go | 4 +- server/server.go | 25 ++- server/stream.go | 14 +- 16 files changed, 676 insertions(+), 352 deletions(-) diff --git a/server/client.go b/server/client.go index 9670fd58..b1edd1e4 100644 --- a/server/client.go +++ b/server/client.go @@ -3948,11 +3948,19 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt if err := json.Unmarshal(getHeader(ClientInfoHdr, msg[:c.pa.hdr]), &cis); err == nil { ci = &cis ci.Service = acc.Name + // Check if we are moving into a share details account from a non-shared + // and add in server and cluster details. + if !share && si.share { + c.addServerAndClusterInfo(ci) + } } } else if c.kind != LEAF || c.pa.hdr < 0 || len(getHeader(ClientInfoHdr, msg[:c.pa.hdr])) == 0 { ci = c.getClientInfo(share) + } else if c.kind == LEAF && si.share { + // We have a leaf header here for ci, augment as above. + ci = c.getClientInfo(si.share) } - + // Set clientInfo if present. if ci != nil { if b, _ := json.Marshal(ci); b != nil { msg = c.setHeader(ClientInfoHdr, string(b), msg) @@ -5034,31 +5042,52 @@ func (ci *ClientInfo) serviceAccount() string { return ci.Account } +// Add in our server and cluster information to this client info. +func (c *client) addServerAndClusterInfo(ci *ClientInfo) { + if ci == nil { + return + } + // Server + if c.kind != LEAF { + ci.Server = c.srv.Name() + } else if c.kind == LEAF { + ci.Server = c.leaf.remoteServer + } + // Cluster + ci.Cluster = c.srv.cachedClusterName() + // If we have gateways fill in cluster alternates. + // These will be in RTT asc order. + if c.srv.gateway.enabled { + var gws []*client + c.srv.getOutboundGatewayConnections(&gws) + for _, c := range gws { + c.mu.Lock() + cn := c.gw.name + c.mu.Unlock() + ci.Alternates = append(ci.Alternates, cn) + } + } +} + // Grabs the information for this client. func (c *client) getClientInfo(detailed bool) *ClientInfo { - if c == nil || (c.kind != CLIENT && c.kind != LEAF && c.kind != JETSTREAM) { + if c == nil || (c.kind != CLIENT && c.kind != LEAF && c.kind != JETSTREAM && c.kind != ACCOUNT) { return nil } - // Server name. Defaults to server ID if not set explicitly. - var cn, sn string + // Result + var ci ClientInfo + if detailed { - if c.kind != LEAF { - sn = c.srv.Name() - } - cn = c.srv.cachedClusterName() + c.addServerAndClusterInfo(&ci) } c.mu.Lock() - var ci ClientInfo // RTT and Account are always added. ci.Account = accForClient(c) ci.RTT = c.rtt // Detailed signals additional opt in. if detailed { - if c.kind == LEAF { - sn = c.leaf.remoteServer - } ci.Start = &c.start ci.Host = c.host ci.ID = c.cid @@ -5066,8 +5095,6 @@ func (c *client) getClientInfo(detailed bool) *ClientInfo { ci.User = c.getRawAuthUser() ci.Lang = c.opts.Lang ci.Version = c.opts.Version - ci.Server = sn - ci.Cluster = cn ci.Jwt = c.opts.JWT ci.IssuerKey = issuerForClient(c) ci.NameTag = c.nameTag diff --git a/server/consumer.go b/server/consumer.go index 29781569..90dba309 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1,4 +1,4 @@ -// Copyright 2019-2021 The NATS Authors +// Copyright 2019-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -2104,8 +2104,8 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _, sendBatch(&wr) } else { // Check for API outstanding requests. - if apiOut := atomic.AddInt64(&js.apiCalls, 1); apiOut > maxJSApiOut { - atomic.AddInt64(&js.apiCalls, -1) + if apiOut := atomic.AddInt64(&js.apiInflight, 1); apiOut > maxJSApiOut { + atomic.AddInt64(&js.apiInflight, -1) sendErr(503, "JetStream API limit exceeded") s.Warnf("JetStream API limit exceeded: %d calls outstanding", apiOut) return @@ -2115,7 +2115,7 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _, o.mu.Lock() sendBatch(&wr) o.mu.Unlock() - atomic.AddInt64(&js.apiCalls, -1) + atomic.AddInt64(&js.apiInflight, -1) }() } } diff --git a/server/errors.json b/server/errors.json index 5a2e857c..a9e74658 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1108,5 +1108,15 @@ "help": "Returned when the delivery subject on a Push Consumer is not a valid NATS Subject", "url": "", "deprecates": "" + }, + { + "constant": "JSStreamMaxBytesRequired", + "code": 400, + "error_code": 10113, + "description": "account requires a stream config to have max bytes set", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } -] +] \ No newline at end of file diff --git a/server/events.go b/server/events.go index d1b14e4e..8454a0b4 100644 --- a/server/events.go +++ b/server/events.go @@ -1,4 +1,4 @@ -// Copyright 2018-2021 The NATS Authors +// Copyright 2018-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -181,6 +181,7 @@ type ClientInfo struct { RTT time.Duration `json:"rtt,omitempty"` Server string `json:"server,omitempty"` Cluster string `json:"cluster,omitempty"` + Alternates []string `json:"alts,omitempty"` Stop *time.Time `json:"stop,omitempty"` Jwt string `json:"jwt,omitempty"` IssuerKey string `json:"issuer_key,omitempty"` @@ -669,6 +670,14 @@ func (s *Server) sendStatsz(subj string) { jStat.Config = &c js.mu.RUnlock() jStat.Stats = js.usageStats() + // Update our own usage since we do not echo so we will not hear ourselves. + ourNode := string(getHash(s.serverName())) + if v, ok := s.nodeToInfo.Load(ourNode); ok && v != nil { + ni := v.(nodeInfo) + ni.stats = jStat.Stats + s.nodeToInfo.Store(ourNode, ni) + } + // Metagroup info. if mg := js.getMetaGroup(); mg != nil { if mg.Leader() { if ci := s.raftNodeToClusterInfo(mg); ci != nil { @@ -695,9 +704,11 @@ func (s *Server) sendStatsz(subj string) { func (s *Server) heartbeatStatsz() { if s.sys.stmr != nil { // Increase after startup to our max. - s.sys.cstatsz *= 4 - if s.sys.cstatsz > s.sys.statsz { - s.sys.cstatsz = s.sys.statsz + if s.sys.cstatsz < s.sys.statsz { + s.sys.cstatsz *= 2 + if s.sys.cstatsz > s.sys.statsz { + s.sys.cstatsz = s.sys.statsz + } } s.sys.stmr.Reset(s.sys.cstatsz) } @@ -714,7 +725,7 @@ func (s *Server) sendStatszUpdate() { func (s *Server) startStatszTimer() { // We will start by sending out more of these and trail off to the statsz being the max. s.sys.cstatsz = 250 * time.Millisecond - // Send out the first one after 250ms. + // Send out the first one quickly, we will slowly back off. s.sys.stmr = time.AfterFunc(s.sys.cstatsz, s.wrapChk(s.heartbeatStatsz)) } @@ -1031,10 +1042,10 @@ func (s *Server) processRemoteServerShutdown(sid string) { }) // Update any state in nodeInfo. s.nodeToInfo.Range(func(k, v interface{}) bool { - si := v.(nodeInfo) - if si.id == sid { - si.offline = true - s.nodeToInfo.Store(k, si) + ni := v.(nodeInfo) + if ni.id == sid { + ni.offline = true + s.nodeToInfo.Store(k, ni) return false } return true @@ -1071,12 +1082,14 @@ func (s *Server) remoteServerShutdown(sub *subscription, c *client, _ *Account, s.Debugf("Received bad server info for remote server shutdown") return } - // Additional processing here. - if !s.sameDomain(si.Domain) { - return - } + + // JetStream node updates if applicable. node := string(getHash(si.Name)) - s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.Domain, si.ID, true, true}) + if v, ok := s.nodeToInfo.Load(node); ok && v != nil { + ni := v.(nodeInfo) + ni.offline = true + s.nodeToInfo.Store(node, ni) + } sid := toks[serverSubjectIndex] if su := s.sys.servers[sid]; su != nil { @@ -1095,44 +1108,66 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su return } si := ssm.Server + + // JetStream node updates. if !s.sameDomain(si.Domain) { return } + var cfg *JetStreamConfig + var stats *JetStreamStats + + if ssm.Stats.JetStream != nil { + cfg = ssm.Stats.JetStream.Config + stats = ssm.Stats.JetStream.Stats + } + node := string(getHash(si.Name)) - s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.Domain, si.ID, false, si.JetStream}) + s.nodeToInfo.Store(node, nodeInfo{ + si.Name, + si.Cluster, + si.Domain, + si.ID, + cfg, + stats, + false, si.JetStream, + }) } // updateRemoteServer is called when we have an update from a remote server. // This allows us to track remote servers, respond to shutdown messages properly, // make sure that messages are ordered, and allow us to prune dead servers. // Lock should be held upon entry. -func (s *Server) updateRemoteServer(ms *ServerInfo) { - su := s.sys.servers[ms.ID] +func (s *Server) updateRemoteServer(si *ServerInfo) { + su := s.sys.servers[si.ID] if su == nil { - s.sys.servers[ms.ID] = &serverUpdate{ms.Seq, time.Now()} - s.processNewServer(ms) + s.sys.servers[si.ID] = &serverUpdate{si.Seq, time.Now()} + s.processNewServer(si) } else { // Should always be going up. - if ms.Seq <= su.seq { - s.Errorf("Received out of order remote server update from: %q", ms.ID) + if si.Seq <= su.seq { + s.Errorf("Received out of order remote server update from: %q", si.ID) return } - su.seq = ms.Seq + su.seq = si.Seq su.ltime = time.Now() } } // processNewServer will hold any logic we want to use when we discover a new server. // Lock should be held upon entry. -func (s *Server) processNewServer(ms *ServerInfo) { +func (s *Server) processNewServer(si *ServerInfo) { // Right now we only check if we have leafnode servers and if so send another // connect update to make sure they switch this account to interest only mode. s.ensureGWsInterestOnlyForLeafNodes() + // Add to our nodeToName - if s.sameDomain(ms.Domain) { - node := string(getHash(ms.Name)) - s.nodeToInfo.Store(node, nodeInfo{ms.Name, ms.Cluster, ms.Domain, ms.ID, false, ms.JetStream}) + if s.sameDomain(si.Domain) { + node := string(getHash(si.Name)) + // Only update if non-existent + if _, ok := s.nodeToInfo.Load(node); !ok { + s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.Domain, si.ID, nil, nil, false, si.JetStream}) + } } // Announce ourselves.. s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID)) @@ -1378,9 +1413,15 @@ type ServerAPIConnzResponse struct { // statszReq is a request for us to respond with current statsz. func (s *Server) statszReq(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { - if !s.EventsEnabled() || reply == _EMPTY_ { + if !s.EventsEnabled() { return } + + // No reply is a signal that we should use our normal broadcast subject. + if reply == _EMPTY_ { + reply = fmt.Sprintf(serverStatsSubj, s.info.ID) + } + opts := StatszEventOptions{} if _, msg := c.msgParts(rmsg); len(msg) != 0 { if err := json.Unmarshal(msg, &opts); err != nil { diff --git a/server/jetstream.go b/server/jetstream.go index fdb66112..6b62a5f1 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1,4 +1,4 @@ -// Copyright 2019-2021 The NATS Authors +// Copyright 2019-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -48,22 +48,22 @@ type JetStreamConfig struct { Domain string `json:"domain,omitempty"` } +// Statistics about JetStream for this server. type JetStreamStats struct { - Memory uint64 `json:"memory"` - Store uint64 `json:"storage"` - ReservedMemoryUsed uint64 `json:"reserved_memory_used,omitempty"` - ReserveStoreUsed uint64 `json:"reserved_storage_used,omitempty"` - Accounts int `json:"accounts,omitempty"` - API JetStreamAPIStats `json:"api"` - ReservedMemory uint64 `json:"reserved_memory,omitempty"` - ReservedStore uint64 `json:"reserved_storage,omitempty"` + Memory uint64 `json:"memory"` + Store uint64 `json:"storage"` + ReservedMemory uint64 `json:"reserved_memory"` + ReservedStore uint64 `json:"reserved_storage"` + Accounts int `json:"accounts"` + API JetStreamAPIStats `json:"api"` } type JetStreamAccountLimits struct { - MaxMemory int64 `json:"max_memory"` - MaxStore int64 `json:"max_storage"` - MaxStreams int `json:"max_streams"` - MaxConsumers int `json:"max_consumers"` + MaxMemory int64 `json:"max_memory"` + MaxStore int64 `json:"max_storage"` + MaxStreams int `json:"max_streams"` + MaxConsumers int `json:"max_consumers"` + MaxBytesRequired bool `json:"max_bytes_required"` } // JetStreamAccountStats returns current statistics about the account's JetStream usage. @@ -78,27 +78,28 @@ type JetStreamAccountStats struct { } type JetStreamAPIStats struct { - Total uint64 `json:"total"` - Errors uint64 `json:"errors"` + Total uint64 `json:"total"` + Errors uint64 `json:"errors"` + Inflight uint64 `json:"inflight,omitempty"` } // This is for internal accounting for JetStream for this server. type jetStream struct { // These are here first because of atomics on 32bit systems. + apiInflight int64 + apiTotal int64 + apiErrors int64 memReserved int64 storeReserved int64 - apiCalls int64 - apiErrors int64 - memTotal int64 - storeTotal int64 - memTotalRes int64 - storeTotalRes int64 + memUsed int64 + storeUsed int64 mu sync.RWMutex srv *Server config JetStreamConfig cluster *jetStreamCluster accounts map[string]*jsAccount apiSubs *Sublist + standAlone bool disabled bool oos bool } @@ -306,8 +307,10 @@ func (s *Server) checkStoreDir(cfg *JetStreamConfig) error { // enableJetStream will start up the JetStream subsystem. func (s *Server) enableJetStream(cfg JetStreamConfig) error { + js := &jetStream{srv: s, config: cfg, accounts: make(map[string]*jsAccount), apiSubs: NewSublistNoCache()} + s.mu.Lock() - s.js = &jetStream{srv: s, config: cfg, accounts: make(map[string]*jsAccount), apiSubs: NewSublistNoCache()} + s.js = js s.mu.Unlock() // FIXME(dlc) - Allow memory only operation? @@ -360,19 +363,21 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error { s.Debugf(" %s", jsAllAPI) s.setupJetStreamExports() - // Enable accounts and restore state before starting clustering. - if err := s.enableJetStreamAccounts(); err != nil { - return err - } - - canExtend := s.canExtendOtherDomain() - standAlone := s.standAloneMode() + standAlone, canExtend := s.standAloneMode(), s.canExtendOtherDomain() if standAlone && canExtend && s.getOpts().JetStreamExtHint != jsWillExtend { canExtend = false s.Noticef("Standalone server started in clustered mode do not support extending domains") s.Noticef(`Manually disable standalone mode by setting the JetStream Option "extension_hint: %s"`, jsWillExtend) } + // Indicate if we will be standalone for checking resource reservations, etc. + js.setJetStreamStandAlone(standAlone && !canExtend) + + // Enable accounts and restore state before starting clustering. + if err := s.enableJetStreamAccounts(); err != nil { + return err + } + // If we are in clustered mode go ahead and start the meta controller. if !standAlone || canExtend { if err := s.enableJetStreamClustering(); err != nil { @@ -706,6 +711,16 @@ func (js *jetStream) isEnabled() bool { return !js.disabled } +// Mark that we will be in standlone mode. +func (js *jetStream) setJetStreamStandAlone(isStandAlone bool) { + if js == nil { + return + } + js.mu.Lock() + defer js.mu.Unlock() + js.standAlone = isStandAlone +} + // JetStreamEnabled reports if jetstream is enabled for this server. func (s *Server) JetStreamEnabled() bool { var js *jetStream @@ -927,30 +942,30 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { limits = dynamicJSAccountLimits } + a.assignJetStreamLimits(limits) + js := s.getJetStream() if js == nil { - a.assignJetStreamLimits(limits) return NewJSNotEnabledError() } - a.assignJetStreamLimits(limits) - js.mu.Lock() - // Check the limits against existing reservations. if _, ok := js.accounts[a.Name]; ok && a.JetStreamEnabled() { js.mu.Unlock() return fmt.Errorf("jetstream already enabled for account") } + + // Check the limits against existing reservations. if err := js.sufficientResources(limits); err != nil { js.mu.Unlock() return err } + jsa := &jsAccount{js: js, account: a, limits: *limits, streams: make(map[string]*stream), sendq: sendq} jsa.utimer = time.AfterFunc(usageTick, jsa.sendClusterUsageUpdateTimer) jsa.storeDir = path.Join(js.config.StoreDir, a.Name) js.accounts[a.Name] = jsa - js.reserveResources(limits) js.mu.Unlock() sysNode := s.Node() @@ -1223,6 +1238,18 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { return nil } +// Return whether or not we require MaxBytes to be set. +func (a *Account) maxBytesRequired() bool { + a.mu.RLock() + defer a.mu.RUnlock() + + jsa := a.js + if jsa == nil { + return false + } + return jsa.limits.MaxBytesRequired +} + // NumStreams will return how many streams we have. func (a *Account) numStreams() int { a.mu.RLock() @@ -1293,8 +1320,7 @@ func (a *Account) lookupStream(name string) (*stream, error) { // UpdateJetStreamLimits will update the account limits for a JetStream enabled account. func (a *Account) UpdateJetStreamLimits(limits *JetStreamAccountLimits) error { a.mu.RLock() - s := a.srv - jsa := a.js + s, jsa := a.srv, a.js a.mu.RUnlock() if s == nil { @@ -1315,7 +1341,6 @@ func (a *Account) UpdateJetStreamLimits(limits *JetStreamAccountLimits) error { // Calculate the delta between what we have and what we want. jsa.mu.Lock() dl := diffCheckedLimits(&jsa.limits, limits) - jsaLimits := jsa.limits jsa.mu.Unlock() js.mu.Lock() @@ -1324,23 +1349,6 @@ func (a *Account) UpdateJetStreamLimits(limits *JetStreamAccountLimits) error { js.mu.Unlock() return err } - // FIXME(dlc) - If we drop and are over the max on memory or store, do we delete?? - js.releaseResources(&jsaLimits) - js.reserveResources(limits) - if jsaLimits.MaxMemory >= 0 && limits.MaxMemory < 0 { - // we had a reserve and are now dropping it - atomic.AddInt64(&js.memTotalRes, -jsa.memTotal) - } else if jsaLimits.MaxMemory < 0 && limits.MaxMemory >= 0 { - // we had no reserve and are now adding it - atomic.AddInt64(&js.memTotalRes, jsa.memTotal) - } - if jsaLimits.MaxStore >= 0 && limits.MaxStore < 0 { - // we had a reserve and are now dropping it - atomic.AddInt64(&js.storeTotalRes, -jsa.storeTotal) - } else if jsaLimits.MaxStore < 0 && limits.MaxStore >= 0 { - // we had no reserve and are now adding it - atomic.AddInt64(&js.storeTotalRes, jsa.storeTotal) - } js.mu.Unlock() // Update @@ -1427,7 +1435,6 @@ func (js *jetStream) disableJetStream(jsa *jsAccount) error { js.mu.Lock() delete(js.accounts, jsa.account.Name) - js.releaseResources(&jsa.limits) js.mu.Unlock() jsa.delete() @@ -1518,17 +1525,11 @@ func (jsa *jsAccount) updateUsage(storeType StorageType, delta int64) { if storeType == MemoryStorage { jsa.usage.mem += delta jsa.memTotal += delta - atomic.AddInt64(&js.memTotal, delta) - if jsa.limits.MaxMemory > 0 { - atomic.AddInt64(&js.memTotalRes, delta) - } + atomic.AddInt64(&js.memUsed, delta) } else { jsa.usage.store += delta jsa.storeTotal += delta - atomic.AddInt64(&js.storeTotal, delta) - if jsa.limits.MaxStore > 0 { - atomic.AddInt64(&js.storeTotalRes, delta) - } + atomic.AddInt64(&js.storeUsed, delta) } // Publish our local updates if in clustered mode. if isClustered { @@ -1550,7 +1551,7 @@ func (jsa *jsAccount) sendClusterUsageUpdateTimer() { // Send updates to our account usage for this server. // Lock should be held. func (jsa *jsAccount) sendClusterUsageUpdate() { - if jsa.js == nil || jsa.js.srv == nil { + if jsa.js == nil || jsa.js.srv == nil || jsa.sendq == nil { return } // These values are absolute so we can limit send rates. @@ -1566,9 +1567,8 @@ func (jsa *jsAccount) sendClusterUsageUpdate() { le.PutUint64(b[8:], uint64(jsa.usage.store)) le.PutUint64(b[16:], uint64(jsa.usage.api)) le.PutUint64(b[24:], uint64(jsa.usage.err)) - if jsa.sendq != nil { - jsa.sendq <- &pubMsg{nil, jsa.updatesPub, _EMPTY_, nil, nil, b, noCompression, false, false} - } + + jsa.sendq <- &pubMsg{nil, jsa.updatesPub, _EMPTY_, nil, nil, b, noCompression, false, false} } func (js *jetStream) wouldExceedLimits(storeType StorageType, sz int) bool { @@ -1577,9 +1577,9 @@ func (js *jetStream) wouldExceedLimits(storeType StorageType, sz int) bool { max int64 ) if storeType == MemoryStorage { - total, max = &js.memTotal, js.config.MaxMemory + total, max = &js.memUsed, js.config.MaxMemory } else { - total, max = &js.storeTotal, js.config.MaxStore + total, max = &js.storeUsed, js.config.MaxStore } return atomic.LoadInt64(total) > (max + int64(sz)) } @@ -1605,9 +1605,19 @@ func (jsa *jsAccount) limitsExceeded(storeType StorageType) bool { return false } +// Check account limits. +func (jsa *jsAccount) checkAccountLimits(config *StreamConfig) error { + return jsa.checkLimits(config, false) +} + +// Check account and server limits. +func (jsa *jsAccount) checkAllLimits(config *StreamConfig) error { + return jsa.checkLimits(config, true) +} + // Check if a new proposed msg set while exceed our account limits. // Lock should be held. -func (jsa *jsAccount) checkLimits(config *StreamConfig) error { +func (jsa *jsAccount) checkLimits(config *StreamConfig, checkServer bool) error { if jsa.limits.MaxStreams > 0 && len(jsa.streams) >= jsa.limits.MaxStreams { return NewJSMaximumStreamsLimitError() } @@ -1617,13 +1627,13 @@ func (jsa *jsAccount) checkLimits(config *StreamConfig) error { } // Check storage, memory or disk. - return jsa.checkBytesLimits(config.MaxBytes, config.Storage, config.Replicas) + return jsa.checkBytesLimits(config.MaxBytes, config.Storage, config.Replicas, checkServer) } -// Check if additional bytes will exceed our account limits. +// Check if additional bytes will exceed our account limits and optionally the server itself. // This should account for replicas. // Lock should be held. -func (jsa *jsAccount) checkBytesLimits(addBytes int64, storage StorageType, replicas int) error { +func (jsa *jsAccount) checkBytesLimits(addBytes int64, storage StorageType, replicas int, checkServer bool) error { if replicas < 1 { replicas = 1 } @@ -1639,11 +1649,10 @@ func (jsa *jsAccount) checkBytesLimits(addBytes int64, storage StorageType, repl if jsa.memReserved+totalBytes > jsa.limits.MaxMemory { return NewJSMemoryResourcesExceededError() } - } else { - // Account is unlimited, check if this server can handle request. - if js.memReserved+addBytes > js.config.MaxMemory { - return NewJSMemoryResourcesExceededError() - } + } + // Check if this server can handle request. + if checkServer && js.memReserved+addBytes > js.config.MaxMemory { + return NewJSMemoryResourcesExceededError() } case FileStorage: // Account limits defined. @@ -1651,11 +1660,10 @@ func (jsa *jsAccount) checkBytesLimits(addBytes int64, storage StorageType, repl if jsa.storeReserved+totalBytes > jsa.limits.MaxStore { return NewJSStorageResourcesExceededError() } - } else { - // Account is unlimited, check if this server can handle request. - if js.storeReserved+addBytes > js.config.MaxStore { - return NewJSStorageResourcesExceededError() - } + } + // Check if this server can handle request. + if checkServer && js.storeReserved+addBytes > js.config.MaxStore { + return NewJSStorageResourcesExceededError() } } @@ -1721,57 +1729,96 @@ func (js *jetStream) usageStats() *JetStreamStats { stats.ReservedMemory = (uint64)(js.memReserved) stats.ReservedStore = (uint64)(js.storeReserved) js.mu.RUnlock() - stats.API.Total = (uint64)(atomic.LoadInt64(&js.apiCalls)) + stats.API.Total = (uint64)(atomic.LoadInt64(&js.apiTotal)) stats.API.Errors = (uint64)(atomic.LoadInt64(&js.apiErrors)) - stats.Memory = (uint64)(atomic.LoadInt64(&js.memTotal)) - stats.Store = (uint64)(atomic.LoadInt64(&js.storeTotal)) - stats.ReservedMemoryUsed = (uint64)(atomic.LoadInt64(&js.memTotalRes)) - stats.ReserveStoreUsed = (uint64)(atomic.LoadInt64(&js.storeTotalRes)) + stats.API.Inflight = (uint64)(atomic.LoadInt64(&js.apiInflight)) + stats.Memory = (uint64)(atomic.LoadInt64(&js.memUsed)) + stats.Store = (uint64)(atomic.LoadInt64(&js.storeUsed)) return &stats } // Check to see if we have enough system resources for this account. // Lock should be held. func (js *jetStream) sufficientResources(limits *JetStreamAccountLimits) error { - if limits == nil { + // If we are clustered we do not really know how many resources will be ultimately available. + // This needs to be handled out of band. + // If we are a single server, we can make decisions here. + if limits == nil || !js.standAlone { return nil } + + // Reserved is now specific to the MaxBytes for streams. if js.memReserved+limits.MaxMemory > js.config.MaxMemory { return NewJSMemoryResourcesExceededError() } if js.storeReserved+limits.MaxStore > js.config.MaxStore { return NewJSStorageResourcesExceededError() } + + // Since we know if we are here we are single server mode, check the account reservations. + var storeReserved, memReserved int64 + for _, jsa := range js.accounts { + jsa.mu.RLock() + if jsa.limits.MaxMemory > 0 { + memReserved += jsa.limits.MaxMemory + } + if jsa.limits.MaxStore > 0 { + storeReserved += jsa.limits.MaxStore + } + jsa.mu.RUnlock() + } + + if memReserved+limits.MaxMemory > js.config.MaxMemory { + return NewJSMemoryResourcesExceededError() + } + if storeReserved+limits.MaxStore > js.config.MaxStore { + return NewJSStorageResourcesExceededError() + } + return nil } -// This will (blindly) reserve the respources requested. -// Lock should be held. -func (js *jetStream) reserveResources(limits *JetStreamAccountLimits) error { - if limits == nil { - return nil +// This will reserve the stream resources requested. +// This will spin off off of MaxBytes. +func (js *jetStream) reserveStreamResources(cfg *StreamConfig) { + if cfg == nil || cfg.MaxBytes <= 0 { + return } - if limits.MaxMemory > 0 { - js.memReserved += limits.MaxMemory + + js.mu.Lock() + switch cfg.Storage { + case MemoryStorage: + js.memReserved += cfg.MaxBytes + case FileStorage: + js.storeReserved += cfg.MaxBytes } - if limits.MaxStore > 0 { - js.storeReserved += limits.MaxStore + s, clustered := js.srv, !js.standAlone + js.mu.Unlock() + // If clustered send an update to the system immediately. + if clustered { + s.sendStatszUpdate() } - return nil } -// Lock should be held. -func (js *jetStream) releaseResources(limits *JetStreamAccountLimits) error { - if limits == nil { - return nil +// Release reserved resources held by a stream. +func (js *jetStream) releaseStreamResources(cfg *StreamConfig) { + if cfg == nil || cfg.MaxBytes <= 0 { + return } - if limits.MaxMemory > 0 { - js.memReserved -= limits.MaxMemory + + js.mu.Lock() + switch cfg.Storage { + case MemoryStorage: + js.memReserved -= cfg.MaxBytes + case FileStorage: + js.storeReserved -= cfg.MaxBytes } - if limits.MaxStore > 0 { - js.storeReserved -= limits.MaxStore + s, clustered := js.srv, !js.standAlone + js.mu.Unlock() + // If clustered send an update to the system immediately. + if clustered { + s.sendStatszUpdate() } - return nil } const ( diff --git a/server/jetstream_api.go b/server/jetstream_api.go index b6d4e4a4..a7fc2fd4 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1,4 +1,4 @@ -// Copyright 2020-2021 The NATS Authors +// Copyright 2020-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -652,8 +652,8 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub // If we are here we have received this request over a non client connection. // We need to make sure not to block. We will spin a Go routine per but also make // sure we do not have too many outstanding. - if apiOut := atomic.AddInt64(&js.apiCalls, 1); apiOut > maxJSApiOut { - atomic.AddInt64(&js.apiCalls, -1) + if apiOut := atomic.AddInt64(&js.apiInflight, 1); apiOut > maxJSApiOut { + atomic.AddInt64(&js.apiInflight, -1) ci, acc, _, msg, err := s.getRequestInfo(c, rmsg) if err == nil { resp := &ApiResponse{Type: JSApiOverloadedType, Error: NewJSInsufficientResourcesError()} @@ -678,7 +678,7 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub // Dispatch the API call to its own Go routine. go func() { jsub.icb(sub, client, acc, subject, reply, rmsg) - atomic.AddInt64(&js.apiCalls, -1) + atomic.AddInt64(&js.apiInflight, -1) }() } @@ -817,7 +817,9 @@ func (a *Account) trackAPI() { jsa.usage.api++ jsa.apiTotal++ jsa.sendClusterUsageUpdate() + js := jsa.js jsa.mu.Unlock() + atomic.AddInt64(&js.apiTotal, 1) } } @@ -834,6 +836,7 @@ func (a *Account) trackAPIErr() { jsa.sendClusterUsageUpdate() js := jsa.js jsa.mu.Unlock() + atomic.AddInt64(&js.apiTotal, 1) atomic.AddInt64(&js.apiErrors, 1) } } @@ -1113,7 +1116,7 @@ func (s *Server) jsonResponse(v interface{}) string { } // Request to create a stream. -func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, a *Account, subject, reply string, rmsg []byte) { +func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { if c == nil || !s.JetStreamEnabled() { return } @@ -1286,6 +1289,14 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, a *Account, } } + // Check for MaxBytes required. + if acc.maxBytesRequired() && cfg.MaxBytes <= 0 { + resp.Error = NewJSStreamMaxBytesRequiredError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + // Hand off to cluster for processing. if s.JetStreamIsClustered() { s.jsClusteredStreamRequest(ci, acc, subject, reply, rmsg, &cfg) return diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index c3e9b9d5..ef5f36ee 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1,4 +1,4 @@ -// Copyright 2020-2021 The NATS Authors +// Copyright 2020-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -831,6 +831,10 @@ func (js *jetStream) monitorCluster() { } } case isLeader = <-lch: + // We want to make sure we are updated on statsz so ping the extended cluster. + if isLeader { + s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil) + } js.processLeaderChange(isLeader) if isLeader && !beenLeader { beenLeader = true @@ -2167,6 +2171,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool { js.mu.Lock() if node := sa.Group.node; node != nil { node.ProposeRemovePeer(ourID) + didRemove = true } sa.Group.node = nil sa.err = nil @@ -3354,6 +3359,11 @@ type streamAssignmentResult struct { Update bool `json:"is_update,omitempty"` } +// Determine if this is an insufficient resources error type. +func isInsufficientResourcesErr(err *ApiError) bool { + return err != nil && IsNatsErr(err, JSInsufficientResourcesErr, JSMemoryResourcesExceededErr, JSStorageResourcesExceededErr) +} + // Process error results of stream and consumer assignments. // Success will be handled by stream leader. func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { @@ -3375,6 +3385,30 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client // FIXME(dlc) - suppress duplicates? if sa := js.streamAssignment(result.Account, result.Stream); sa != nil { + canDelete := !result.Update && time.Since(sa.Created) < 5*time.Second + + // See if we should retry in case this cluster is full but there are others. + if cfg, ci := sa.Config, sa.Client; cfg != nil && ci != nil && isInsufficientResourcesErr(result.Response.Error) && canDelete { + // If cluster is defined we can not retry. + if cfg.Placement == nil || cfg.Placement.Cluster == _EMPTY_ { + // If we have additional clusters to try we can retry. + if ci != nil && len(ci.Alternates) > 0 { + if rg := js.createGroupForStream(ci, cfg); rg != nil { + s.Warnf("Retrying cluster placement for stream '%s > %s'", result.Account, result.Stream) + // Pick a new preferred leader. + rg.setPreferred() + // Get rid of previous attempt. + cc.meta.Propose(encodeDeleteStreamAssignment(sa)) + // Propose new. + sa.Group, sa.err = rg, nil + cc.meta.Propose(encodeAddStreamAssignment(sa)) + return + } + } + } + } + + // Respond to the user here. var resp string if result.Response != nil { resp = s.jsonResponse(result.Response) @@ -3385,11 +3419,8 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client sa.responded = true js.srv.sendAPIErrResponse(sa.Client, acc, sa.Subject, sa.Reply, _EMPTY_, resp) } - // Here we will remove this assignment, so this needs to only execute when we are sure - // this is what we want to do. - // TODO(dlc) - Could have mixed results, should track per peer. - // Set sa.err while we are deleting so we will not respond to list/names requests. - if !result.Update && time.Since(sa.Created) < 5*time.Second { + // Remove this assignment if possible. + if canDelete { sa.err = NewJSClusterNotAssignedError() cc.meta.Propose(encodeDeleteStreamAssignment(sa)) } @@ -3562,31 +3593,77 @@ func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePe } // selectPeerGroup will select a group of peers to start a raft group. -// TODO(dlc) - For now randomly select. Can be way smarter. -func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string) []string { - var nodes []string - peers := cc.meta.Peers() - s := cc.s +func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig) []string { + if cluster == _EMPTY_ || cfg == nil { + return nil + } + + var maxBytes uint64 + if cfg != nil && cfg.MaxBytes > 0 { + maxBytes = uint64(cfg.MaxBytes) + } + + // Used for weighted sorting based on availability. + type wn struct { + id string + avail uint64 + } + + var nodes []wn + s, peers := cc.s, cc.meta.Peers() for _, p := range peers { - // If we know its offline or it is not in our list it probably shutdown, so don't consider. - if si, ok := s.nodeToInfo.Load(p.ID); !ok || si.(nodeInfo).offline { + si, ok := s.nodeToInfo.Load(p.ID) + if !ok || si == nil { continue } - if cluster != _EMPTY_ { - if s.clusterNameForNode(p.ID) == cluster { - nodes = append(nodes, p.ID) - } - } else { - nodes = append(nodes, p.ID) + ni := si.(nodeInfo) + // Only select from the designated named cluster. + // If we know its offline or we do not have config or stats don't consider. + if ni.cluster != cluster || ni.offline || ni.cfg == nil || ni.stats == nil { + continue } + + var available uint64 + switch cfg.Storage { + case MemoryStorage: + used := ni.stats.ReservedMemory + if ni.stats.Memory > used { + used = ni.stats.Memory + } + if ni.cfg.MaxMemory > int64(used) { + available = uint64(ni.cfg.MaxMemory) - used + } + case FileStorage: + used := ni.stats.ReservedStore + if ni.stats.Store > used { + used = ni.stats.Store + } + if ni.cfg.MaxStore > int64(used) { + available = uint64(ni.cfg.MaxStore) - used + } + } + + // Check if we have enveough room if maxBytes set. + if maxBytes > 0 && maxBytes > available { + continue + } + // Add to our list of potential nodes. + nodes = append(nodes, wn{p.ID, available}) } + + // If we could not select enough peers, fail. if len(nodes) < r { return nil } - // Don't depend on range to randomize. - rand.Shuffle(len(nodes), func(i, j int) { nodes[i], nodes[j] = nodes[j], nodes[i] }) - return nodes[:r] + // Sort based on available from most to least. + sort.Slice(nodes, func(i, j int) bool { return nodes[i].avail > nodes[j].avail }) + + var results []string + for _, r := range nodes[:r] { + results = append(results, r.id) + } + return results } func groupNameForStream(peers []string, storage StorageType) string { @@ -3609,22 +3686,33 @@ func groupName(prefix string, peers []string, storage StorageType) string { // createGroupForStream will create a group for assignment for the stream. // Lock should be held. -func (cc *jetStreamCluster) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) *raftGroup { +func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) *raftGroup { replicas := cfg.Replicas if replicas == 0 { replicas = 1 } - cluster := ci.Cluster - if cfg.Placement != nil && cfg.Placement.Cluster != _EMPTY_ { + + // Default connected cluster from the request origin. + cc, cluster := js.cluster, ci.Cluster + // If specified, override the default. + clusterDefined := cfg.Placement != nil && cfg.Placement.Cluster != _EMPTY_ + if clusterDefined { cluster = cfg.Placement.Cluster } - // Need to create a group here. - // TODO(dlc) - Can be way smarter here. - peers := cc.selectPeerGroup(replicas, cluster) - if len(peers) == 0 { - return nil + clusters := []string{cluster} + if !clusterDefined { + clusters = append(clusters, ci.Alternates...) } - return &raftGroup{Name: groupNameForStream(peers, cfg.Storage), Storage: cfg.Storage, Peers: peers} + + // Need to create a group here. + for _, cn := range clusters { + peers := cc.selectPeerGroup(replicas, cn, cfg) + if len(peers) < replicas { + continue + } + return &raftGroup{Name: groupNameForStream(peers, cfg.Storage), Storage: cfg.Storage, Peers: peers} + } + return nil } func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, config *StreamConfig) { @@ -3670,8 +3758,8 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, return } - // Check for stream limits here before proposing. - if err := jsa.checkLimits(cfg); err != nil { + // Check for account limits here before proposing. + if err := jsa.checkAccountLimits(cfg); err != nil { resp.Error = NewJSStreamLimitsError(err, Unless(err)) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return @@ -3681,6 +3769,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, js.mu.Lock() defer js.mu.Unlock() + // If this stream already exists, turn this into a stream info call. if sa := js.streamAssignment(acc.Name, cfg.Name); sa != nil { // If they are the same then we will forward on as a stream info request. // This now matches single server behavior. @@ -3700,8 +3789,9 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, resp.Error = NewJSStreamNameExistError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return + } - } else if cfg.Sealed { + if cfg.Sealed { resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for create can not be sealed")) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return @@ -3721,7 +3811,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, } // Raft group selection and placement. - rg := cc.createGroupForStream(ci, cfg) + rg := js.createGroupForStream(ci, cfg) if rg == nil { resp.Error = NewJSInsufficientResourcesError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) @@ -3873,7 +3963,12 @@ func (s *Server) jsClusteredStreamPurgeRequest( s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp)) } -func (s *Server) jsClusteredStreamRestoreRequest(ci *ClientInfo, acc *Account, req *JSApiStreamRestoreRequest, stream, subject, reply string, rmsg []byte) { +func (s *Server) jsClusteredStreamRestoreRequest( + ci *ClientInfo, + acc *Account, + req *JSApiStreamRestoreRequest, + stream, subject, reply string, rmsg []byte) { + js, cc := s.getJetStreamCluster() if js == nil || cc == nil { return @@ -3892,7 +3987,7 @@ func (s *Server) jsClusteredStreamRestoreRequest(ci *ClientInfo, acc *Account, r } // Raft group selection and placement. - rg := cc.createGroupForStream(ci, cfg) + rg := js.createGroupForStream(ci, cfg) if rg == nil { resp.Error = NewJSInsufficientResourcesError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index cfe2f16a..8b5453da 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -1,4 +1,4 @@ -// Copyright 2020-2021 The NATS Authors +// Copyright 2020-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -143,9 +143,7 @@ func TestJetStreamClusterStreamLimitWithAccountDefaults(t *testing.T) { Replicas: 2, MaxBytes: 15 * 1024 * 1024, }) - if err == nil || !strings.Contains(err.Error(), "insufficient storage") { - t.Fatalf("Expected %v but got %v", ApiErrors[JSStorageResourcesExceededErr], err) - } + require_Error(t, err, NewJSInsufficientResourcesError(), NewJSStorageResourcesExceededError()) } func TestJetStreamClusterSingleReplicaStreams(t *testing.T) { @@ -966,20 +964,17 @@ func TestJetStreamClusterMaxBytesForStream(t *testing.T) { // Stream config. cfg := &nats.StreamConfig{ Name: "TEST", - Subjects: []string{"foo", "bar"}, Replicas: 2, + MaxBytes: 2 * 1024 * 1024 * 1024, // 2GB } - // 2GB - cfg.MaxBytes = 2 * 1024 * 1024 * 1024 - if _, err := js.AddStream(cfg); err != nil { - t.Fatalf("Unexpected error: %v", err) - } + _, err = js.AddStream(cfg) + require_NoError(t, err) + // Make sure going over the single server limit though is enforced (for now). + cfg.Name = "TEST2" cfg.MaxBytes *= 2 _, err = js.AddStream(cfg) - if err == nil || !strings.Contains(err.Error(), "insufficient storage resources") { - t.Fatalf("Expected %q error, got %q", "insufficient storage resources", err.Error()) - } + require_Error(t, err, NewJSInsufficientResourcesError(), NewJSStorageResourcesExceededError()) } func TestJetStreamClusterStreamPublishWithActiveConsumers(t *testing.T) { @@ -9764,6 +9759,146 @@ func TestJetStreamSuperClusterPushConsumerInterest(t *testing.T) { } } +func TestJetStreamClusterOverflowPlacement(t *testing.T) { + sc := createJetStreamSuperClusterWithTemplate(t, jsClusterMaxBytesTempl, 3, 3) + defer sc.shutdown() + + pcn := "C2" + s := sc.clusterForName(pcn).randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + // With this setup, we opted in for requiring MaxBytes, so this should error. + _, err := js.AddStream(&nats.StreamConfig{ + Name: "foo", + Replicas: 3, + }) + require_Error(t, err, NewJSStreamMaxBytesRequiredError()) + + // R=2 on purpose to leave one server empty. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "foo", + Replicas: 2, + MaxBytes: 2 * 1024 * 1024 * 1024, + }) + require_NoError(t, err) + + // Now try to add another that will overflow the current cluster's reservation. + // Since we asked explicitly for the same cluster this should fail. + // Note this will not be testing the peer picker since the update has probably not made it to the meta leader. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "bar", + Replicas: 3, + MaxBytes: 2 * 1024 * 1024 * 1024, + Placement: &nats.Placement{Cluster: pcn}, + }) + require_Error(t, err, NewJSInsufficientResourcesError(), NewJSStorageResourcesExceededError()) + + // Now test actual overflow placement. So try again with no placement designation. + // This will test the peer picker's logic since they are updated at this point and the meta leader + // knows it can not place it in C2. + si, err := js.AddStream(&nats.StreamConfig{ + Name: "bar", + Replicas: 3, + MaxBytes: 2 * 1024 * 1024 * 1024, + }) + require_NoError(t, err) + + // Make sure we did not get place into C2. + falt := si.Cluster.Name + if falt == pcn { + t.Fatalf("Expected to be placed in another cluster besides %q, but got %q", pcn, falt) + } + + // One more time that should spill over again to our last cluster. + si, err = js.AddStream(&nats.StreamConfig{ + Name: "baz", + Replicas: 3, + MaxBytes: 2 * 1024 * 1024 * 1024, + }) + require_NoError(t, err) + + // Make sure we did not get place into C2. + if salt := si.Cluster.Name; salt == pcn || salt == falt { + t.Fatalf("Expected to be placed in last cluster besides %q or %q, but got %q", pcn, falt, salt) + } + + // Now place a stream of R1 into C2 which should have space. + si, err = js.AddStream(&nats.StreamConfig{ + Name: "dlc", + MaxBytes: 2 * 1024 * 1024 * 1024, + }) + require_NoError(t, err) + + if si.Cluster.Name != pcn { + t.Fatalf("Expected to be placed in our origin cluster %q, but got %q", pcn, si.Cluster.Name) + } +} + +func TestJetStreamClusterConcurrentOverflow(t *testing.T) { + sc := createJetStreamSuperClusterWithTemplate(t, jsClusterMaxBytesTempl, 3, 3) + defer sc.shutdown() + + pcn := "C2" + + startCh := make(chan bool) + var wg sync.WaitGroup + var swg sync.WaitGroup + + start := func(name string) { + wg.Add(1) + defer wg.Done() + + s := sc.clusterForName(pcn).randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + swg.Done() + <-startCh + + _, err := js.AddStream(&nats.StreamConfig{ + Name: name, + Replicas: 3, + MaxBytes: 2 * 1024 * 1024 * 1024, + }) + require_NoError(t, err) + } + + swg.Add(2) + go start("foo") + go start("bar") + swg.Wait() + // Now start both at same time. + close(startCh) + wg.Wait() +} + +func TestJetStreamClusterBalancedPlacement(t *testing.T) { + c := createJetStreamClusterWithTemplate(t, jsClusterMaxBytesTempl, "CB", 5) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // We have 10GB (2GB X 5) available. + // Use MaxBytes for ease of test (used works too) and place 5 1GB streams with R=2. + for i := 1; i <= 5; i++ { + _, err := js.AddStream(&nats.StreamConfig{ + Name: fmt.Sprintf("S-%d", i), + Replicas: 2, + MaxBytes: 1 * 1024 * 1024 * 1024, + }) + require_NoError(t, err) + } + // Make sure the next one fails properly. + _, err := js.AddStream(&nats.StreamConfig{ + Name: "FAIL", + Replicas: 2, + MaxBytes: 1 * 1024 * 1024 * 1024, + }) + require_Error(t, err, NewJSInsufficientResourcesError(), NewJSStorageResourcesExceededError()) +} + // Support functions // Used to setup superclusters for tests. @@ -9829,6 +9964,36 @@ var jsClusterTempl = ` accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } ` +var jsClusterMaxBytesTempl = ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + + leaf { + listen: 127.0.0.1:-1 + } + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + no_auth_user: u + + accounts { + $U { + users = [ { user: "u", pass: "p" } ] + jetstream: { + max_mem: 128MB + max_file: 8GB + max_bytes: true // Forces streams to indicate max_bytes. + } + } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } +` + var jsSuperClusterTempl = ` %s gateway { @@ -10733,10 +10898,10 @@ func (c *cluster) waitOnLeader() { expires := time.Now().Add(40 * time.Second) for time.Now().Before(expires) { if leader := c.leader(); leader != nil { - time.Sleep(100 * time.Millisecond) + time.Sleep(250 * time.Millisecond) return } - time.Sleep(25 * time.Millisecond) + time.Sleep(10 * time.Millisecond) } c.t.Fatalf("Expected a cluster leader, got none") @@ -10751,7 +10916,7 @@ func (c *cluster) waitOnClusterReady() { func (c *cluster) waitOnClusterReadyWithNumPeers(numPeersExpected int) { c.t.Helper() var leader *Server - expires := time.Now().Add(20 * time.Second) + expires := time.Now().Add(40 * time.Second) for time.Now().Before(expires) { if leader = c.leader(); leader != nil { break @@ -10761,10 +10926,10 @@ func (c *cluster) waitOnClusterReadyWithNumPeers(numPeersExpected int) { // Now make sure we have all peers. for leader != nil && time.Now().Before(expires) { if len(leader.JetStreamClusterPeers()) == numPeersExpected { - time.Sleep(100 * time.Millisecond) + time.Sleep(250 * time.Millisecond) return } - time.Sleep(50 * time.Millisecond) + time.Sleep(10 * time.Millisecond) } peersSeen := len(leader.JetStreamClusterPeers()) diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index 9eaf94a5..2b105f28 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -263,6 +263,9 @@ const ( // JSStreamLimitsErrF General stream limits exceeded error string ({err}) JSStreamLimitsErrF ErrorIdentifier = 10053 + // JSStreamMaxBytesRequired account requires a stream config to have max bytes set + JSStreamMaxBytesRequired ErrorIdentifier = 10113 + // JSStreamMessageExceedsMaximumErr message size exceeds maximum allowed JSStreamMessageExceedsMaximumErr ErrorIdentifier = 10054 @@ -427,6 +430,7 @@ var ( JSStreamInvalidErr: {Code: 500, ErrCode: 10096, Description: "stream not valid"}, JSStreamInvalidExternalDeliverySubjErrF: {Code: 400, ErrCode: 10024, Description: "stream external delivery prefix {prefix} must not contain wildcards"}, JSStreamLimitsErrF: {Code: 500, ErrCode: 10053, Description: "{err}"}, + JSStreamMaxBytesRequired: {Code: 400, ErrCode: 10113, Description: "account requires a stream config to have max bytes set"}, JSStreamMessageExceedsMaximumErr: {Code: 400, ErrCode: 10054, Description: "message size exceeds maximum allowed"}, JSStreamMirrorNotUpdatableErr: {Code: 400, ErrCode: 10055, Description: "Mirror configuration can not be updated"}, JSStreamMismatchErr: {Code: 400, ErrCode: 10056, Description: "stream name in subject does not match request"}, @@ -1457,6 +1461,16 @@ func NewJSStreamLimitsError(err error, opts ...ErrorOption) *ApiError { } } +// NewJSStreamMaxBytesRequiredError creates a new JSStreamMaxBytesRequired error: "account requires a stream config to have max bytes set" +func NewJSStreamMaxBytesRequiredError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSStreamMaxBytesRequired] +} + // NewJSStreamMessageExceedsMaximumError creates a new JSStreamMessageExceedsMaximumErr error: "message size exceeds maximum allowed" func NewJSStreamMessageExceedsMaximumError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/monitor.go b/server/monitor.go index bd66c3cf..1f8c484b 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1,4 +1,4 @@ -// Copyright 2013-2019 The NATS Authors +// Copyright 2013-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -2417,11 +2417,10 @@ type JSInfo struct { Disabled bool `json:"disabled,omitempty"` Config JetStreamConfig `json:"config,omitempty"` JetStreamStats - APICalls int64 `json:"current_api_calls"` - Streams int `json:"total_streams,omitempty"` - Consumers int `json:"total_consumers,omitempty"` - Messages uint64 `json:"total_messages,omitempty"` - Bytes uint64 `json:"total_message_bytes,omitempty"` + Streams int `json:"streams"` + Consumers int `json:"consumers"` + Messages uint64 `json:"messages"` + Bytes uint64 `json:"bytes"` Meta *MetaClusterInfo `json:"meta_cluster,omitempty"` // aggregate raft info @@ -2576,7 +2575,6 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) { accounts = append(accounts, info) } s.js.mu.RUnlock() - jsi.APICalls = atomic.LoadInt64(&s.js.apiCalls) if mg := s.js.getMetaGroup(); mg != nil { if ci := s.raftNodeToClusterInfo(mg); ci != nil { diff --git a/server/monitor_test.go b/server/monitor_test.go index 9b0fb508..c4b75903 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -1,4 +1,4 @@ -// Copyright 2013-2020 The NATS Authors +// Copyright 2013-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -4012,6 +4012,7 @@ func TestMonitorJsz(t *testing.T) { } ACC { users [{user: usr, password: pwd}] + // In clustered mode, these reservations will not impact any one server. jetstream: {max_store: 4Mb, max_memory: 5Mb} } BCC_TO_HAVE_ONE_EXTRA { @@ -4087,12 +4088,6 @@ func TestMonitorJsz(t *testing.T) { if info.Messages != 1 { t.Fatalf("expected one message but got %d", info.Messages) } - if info.ReservedStore != 4*1024*1024 { - t.Fatalf("expected 4Mb reserved, got %d bytes", info.ReservedStore) - } - if info.ReservedMemory != 5*1024*1024 { - t.Fatalf("expected 5Mb reserved, got %d bytes", info.ReservedStore) - } } }) t.Run("accounts", func(t *testing.T) { @@ -4207,110 +4202,6 @@ func TestMonitorJsz(t *testing.T) { }) } -func TestMonitorJszAccountReserves(t *testing.T) { - readJsInfo := func(url string) *JSInfo { - t.Helper() - body := readBody(t, url) - info := &JSInfo{} - err := json.Unmarshal(body, info) - require_NoError(t, err) - return info - } - tmpDir := createDir(t, "srv") - defer removeDir(t, tmpDir) - tmplCfg := ` - listen: 127.0.0.1:-1 - http_port: 7501 - system_account: SYS - jetstream: { - store_dir: %s - } - accounts { - SYS { users [{user: sys, password: pwd}] } - ACC { - users [{user: usr, password: pwd}] - %s - } - }` - - conf := createConfFile(t, []byte(fmt.Sprintf(tmplCfg, tmpDir, "jetstream: enabled"))) - defer removeFile(t, conf) - s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - checkForJSClusterUp(t, s) - - nc := natsConnect(t, fmt.Sprintf("nats://usr:pwd@127.0.0.1:%d", s.opts.Port)) - defer nc.Close() - js, err := nc.JetStream(nats.MaxWait(5 * time.Second)) - require_NoError(t, err) - for _, v := range []struct { - subject string - storage nats.StorageType - }{ - {"file", nats.FileStorage}, - {"mem", nats.MemoryStorage}} { - _, err = js.AddStream(&nats.StreamConfig{ - Name: v.subject, - Subjects: []string{v.subject}, - Replicas: 1, - Storage: v.storage, - }) - require_NoError(t, err) - require_NoError(t, nc.Flush()) - } - - send := func() { - for _, subj := range []string{"file", "mem"} { - _, err = js.Publish(subj, []byte("hello world "+subj)) - require_NoError(t, err) - } - require_NoError(t, nc.Flush()) - } - - test := func(msgs, reservedMemory, reservedStore uint64, totalIsReserveUsd bool) { - t.Helper() - info := readJsInfo(fmt.Sprintf("http://127.0.0.1:%d/jsz", s.opts.HTTPPort)) - if info.Streams != 2 { - t.Fatalf("expected stream count to be 1 but got %d", info.Streams) - } - if info.Messages != msgs { - t.Fatalf("expected one message but got %d", info.Messages) - } - if info.ReservedStore != reservedStore { - t.Fatalf("expected %d bytes reserved, got %d bytes", reservedStore, info.ReservedStore) - } - if info.ReservedMemory != reservedMemory { - t.Fatalf("expected %d bytes reserved, got %d bytes", reservedMemory, info.ReservedStore) - } - if info.Memory == 0 { - t.Fatalf("memory expected to be not 0") - } - if info.Store == 0 { - t.Fatalf("store expected to be not 0") - } - memory, store := uint64(0), uint64(0) - if totalIsReserveUsd { - memory = info.Memory - store = info.Store - } - if info.ReservedMemoryUsed != memory { - t.Fatalf("expected %d bytes reserved memory used, got %d bytes", memory, info.ReservedMemoryUsed) - } - if info.ReserveStoreUsed != store { - t.Fatalf("expected %d bytes reserved store used, got %d bytes", store, info.ReserveStoreUsed) - } - } - - send() - test(2, 0, 0, false) - reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmplCfg, tmpDir, "jetstream: {max_mem: 4Mb, max_store: 5Mb}")) - test(2, 4*1024*1024, 5*1024*1024, true) - send() - test(4, 4*1024*1024, 5*1024*1024, true) - reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmplCfg, tmpDir, "jetstream: enabled")) - test(4, 0, 0, false) -} - func TestMonitorReloadTLSConfig(t *testing.T) { template := ` listen: "127.0.0.1:-1" diff --git a/server/mqtt_test.go b/server/mqtt_test.go index e39cf0af..4a9096b0 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -3164,6 +3164,8 @@ func TestMQTTLeafnodeWithoutJSToClusterWithJSNoSharedSysAcc(t *testing.T) { checkFor(t, 10*time.Second, 50*time.Millisecond, func() error { for _, s := range cluster { if s.JetStreamIsLeader() { + // Need to wait for usage updates now to propagate to meta leader. + time.Sleep(250 * time.Millisecond) return nil } } @@ -3180,13 +3182,9 @@ func TestMQTTLeafnodeWithoutJSToClusterWithJSNoSharedSysAcc(t *testing.T) { lno.Accounts = append(lno.Accounts, NewAccount("unused-account")) fallthrough case 1: - lno.JsAccDefaultDomain = map[string]string{ - "$G": "", - } + lno.JsAccDefaultDomain = map[string]string{"$G": ""} case 2: - lno.JsAccDefaultDomain = map[string]string{ - "$G": o1.JetStreamDomain, - } + lno.JsAccDefaultDomain = map[string]string{"$G": o1.JetStreamDomain} case 3: // turn off jetstream in $G by adding another account and set mqtt domain option lno.Accounts = append(lno.Accounts, NewAccount("unused-account")) diff --git a/server/opts.go b/server/opts.go index 7e05afe2..044ce294 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1,4 +1,4 @@ -// Copyright 2012-2021 The NATS Authors +// Copyright 2012-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -1601,7 +1601,7 @@ func parseGateway(v interface{}, o *Options, errors *[]error, warnings *[]error) return nil } -var dynamicJSAccountLimits = &JetStreamAccountLimits{-1, -1, -1, -1} +var dynamicJSAccountLimits = &JetStreamAccountLimits{-1, -1, -1, -1, false} // Parses jetstream account limits for an account. Simple setup with boolen is allowed, and we will // use dynamic account limits. @@ -1626,7 +1626,7 @@ func parseJetStreamForAccount(v interface{}, acc *Account, errors *[]error, warn return &configErr{tk, fmt.Sprintf("Expected 'enabled' or 'disabled' for string value, got '%s'", vv)} } case map[string]interface{}: - jsLimits := &JetStreamAccountLimits{-1, -1, -1, -1} + jsLimits := &JetStreamAccountLimits{-1, -1, -1, -1, false} for mk, mv := range vv { tk, mv = unwrapValue(mv, <) switch strings.ToLower(mk) { @@ -1654,6 +1654,12 @@ func parseJetStreamForAccount(v interface{}, acc *Account, errors *[]error, warn return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} } jsLimits.MaxConsumers = int(vv) + case "max_bytes_required", "max_stream_bytes", "max_bytes": + vv, ok := mv.(bool) + if !ok { + return &configErr{tk, fmt.Sprintf("Expected a parseable bool for %q, got %v", mk, mv)} + } + jsLimits.MaxBytesRequired = bool(vv) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/route.go b/server/route.go index 652c0c09..2942912d 100644 --- a/server/route.go +++ b/server/route.go @@ -1,4 +1,4 @@ -// Copyright 2013-2020 The NATS Authors +// Copyright 2013-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -1422,7 +1422,7 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) { s.remotes[id] = c // check to be consistent and future proof. but will be same domain if s.sameDomain(info.Domain) { - s.nodeToInfo.Store(c.route.hash, nodeInfo{c.route.remoteName, s.info.Cluster, info.Domain, id, false, info.JetStream}) + s.nodeToInfo.Store(c.route.hash, nodeInfo{c.route.remoteName, s.info.Cluster, info.Domain, id, nil, nil, false, info.JetStream}) } c.mu.Lock() c.route.connectURLs = info.ClientConnectURLs diff --git a/server/server.go b/server/server.go index 8b5e224c..6e3ded5e 100644 --- a/server/server.go +++ b/server/server.go @@ -1,4 +1,4 @@ -// Copyright 2012-2021 The NATS Authors +// Copyright 2012-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -274,6 +274,8 @@ type nodeInfo struct { cluster string domain string id string + cfg *JetStreamConfig + stats *JetStreamStats offline bool js bool } @@ -384,9 +386,19 @@ func NewServer(opts *Options) (*Server, error) { s.mu.Lock() defer s.mu.Unlock() - // Place ourselves in some lookup maps. - ourNode := string(getHash(serverName)) - s.nodeToInfo.Store(ourNode, nodeInfo{serverName, opts.Cluster.Name, opts.JetStreamDomain, info.ID, false, opts.JetStream}) + // Place ourselves in the JetStream nodeInfo if needed. + if opts.JetStream { + ourNode := string(getHash(serverName)) + s.nodeToInfo.Store(ourNode, nodeInfo{ + serverName, + opts.Cluster.Name, + opts.JetStreamDomain, + info.ID, + &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore}, + nil, + false, true, + }) + } s.routeResolver = opts.Cluster.resolver if s.routeResolver == nil { @@ -560,6 +572,11 @@ func (s *Server) isClusterNameDynamic() bool { return s.getOpts().Cluster.Name == _EMPTY_ } +// Returns our configured serverName. +func (s *Server) serverName() string { + return s.getOpts().ServerName +} + // ClientURL returns the URL used to connect clients. Helpful in testing // when we designate a random client port (-1). func (s *Server) ClientURL() string { diff --git a/server/stream.go b/server/stream.go index aeebb423..a1cc58ec 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1,4 +1,4 @@ -// Copyright 2019-2021 The NATS Authors +// Copyright 2019-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -314,8 +314,8 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt return nil, ApiErrors[JSStreamNameExistErr] } } - // Check for limits. - if err := jsa.checkLimits(&cfg); err != nil { + // Check for account and server limits. + if err := jsa.checkAllLimits(&cfg); err != nil { jsa.mu.Unlock() return nil, err } @@ -442,6 +442,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt // Setup our internal send go routine. mset.setupSendCapabilities() + // Reserve resources if MaxBytes present. + mset.js.reserveStreamResources(&mset.cfg) + // Call directly to set leader if not in clustered mode. // This can be called though before we actually setup clustering, so check both. if singleServerMode { @@ -965,7 +968,7 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig) (*StreamConfig, } // Check limits. - if err := jsa.checkLimits(&cfg); err != nil { + if err := jsa.checkAllLimits(&cfg); err != nil { return nil, err } return &cfg, nil @@ -3319,7 +3322,7 @@ func (mset *stream) delete() error { // Internal function to stop or delete the stream. func (mset *stream) stop(deleteFlag, advisory bool) error { mset.mu.RLock() - jsa := mset.jsa + js, jsa := mset.js, mset.jsa mset.mu.RUnlock() if jsa == nil { @@ -3427,6 +3430,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { if err := mset.store.Delete(); err != nil { return err } + js.releaseStreamResources(&mset.cfg) } else if err := mset.store.Stop(); err != nil { return err }