When checking limits we would check total ask against the server limits if limits were not set.

We were also dynamically setting account limits based on a single server limit.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-06-12 10:20:50 -07:00
parent ecc2545e6d
commit ceebc3ae07
3 changed files with 55 additions and 24 deletions

View File

@@ -811,13 +811,15 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
sendq := s.sys.sendq
s.mu.Unlock()
js := s.getJetStream()
// No limits means we dynamically set up limits.
// We also place limits here so we know that the account is configured for JetStream.
if limits == nil {
limits = dynamicJSAccountLimits
}
js := s.getJetStream()
if js == nil {
// Place limits here so we know that the account is configured for JetStream.
if limits == nil {
limits = dynamicJSAccountLimits
}
a.assignJetStreamLimits(limits)
return ApiErrors[JSNotEnabledErr]
}
@@ -826,9 +828,8 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
return fmt.Errorf("jetstream can not be enabled on the system account")
}
// No limits means we dynamically set up limits.
if limits == nil {
limits = js.dynamicAccountLimits()
limits = dynamicJSAccountLimits
}
a.assignJetStreamLimits(limits)
@@ -1139,7 +1140,7 @@ func (a *Account) UpdateJetStreamLimits(limits *JetStreamAccountLimits) error {
}
if limits == nil {
limits = js.dynamicAccountLimits()
limits = dynamicJSAccountLimits
}
// Calculate the delta between what we have and what we want.
@@ -1441,7 +1442,7 @@ func (jsa *jsAccount) checkLimits(config *StreamConfig) error {
// Check storage, memory or disk.
if config.MaxBytes > 0 {
return jsa.checkBytesLimits(config.MaxBytes*int64(config.Replicas), config.Storage)
return jsa.checkBytesLimits(config.MaxBytes, config.Storage, config.Replicas)
}
return nil
}
@@ -1449,17 +1450,21 @@ func (jsa *jsAccount) checkLimits(config *StreamConfig) error {
// Check if additional bytes will exceed our account limits.
// This should account for replicas.
// Lock should be held.
func (jsa *jsAccount) checkBytesLimits(addBytes int64, storage StorageType) error {
func (jsa *jsAccount) checkBytesLimits(addBytes int64, storage StorageType, replicas int) error {
if replicas < 1 {
replicas = 1
}
js, totalBytes := jsa.js, addBytes*int64(replicas)
switch storage {
case MemoryStorage:
// Account limits defined.
if jsa.limits.MaxMemory > 0 {
if jsa.memReserved+addBytes > jsa.limits.MaxMemory {
if jsa.memReserved+totalBytes > jsa.limits.MaxMemory {
return ApiErrors[JSMemoryResourcesExceededErr]
}
} else {
// Account is unlimited, check if this server can handle request.
js := jsa.js
if js.memReserved+addBytes > js.config.MaxMemory {
return ApiErrors[JSMemoryResourcesExceededErr]
}
@@ -1467,12 +1472,11 @@ func (jsa *jsAccount) checkBytesLimits(addBytes int64, storage StorageType) erro
case FileStorage:
// Account limits defined.
if jsa.limits.MaxStore > 0 {
if jsa.storeReserved+addBytes > jsa.limits.MaxStore {
if jsa.storeReserved+totalBytes > jsa.limits.MaxStore {
return ApiErrors[JSStorageResourcesExceededErr]
}
} else {
// Account is unlimited, check if this server can handle request.
js := jsa.js
if js.storeReserved+addBytes > js.config.MaxStore {
return ApiErrors[JSStorageResourcesExceededErr]
}
@@ -1533,15 +1537,6 @@ func (js *jetStream) lookupAccount(a *Account) *jsAccount {
return jsa
}
// Will dynamically create limits for this account.
func (js *jetStream) dynamicAccountLimits() *JetStreamAccountLimits {
js.mu.RLock()
// For now use all resources. Mostly meant for $G in non-account mode.
limits := &JetStreamAccountLimits{js.config.MaxMemory, js.config.MaxStore, -1, -1}
js.mu.RUnlock()
return limits
}
// Report on JetStream stats and usage for this server.
func (js *jetStream) usageStats() *JetStreamStats {
var stats JetStreamStats

View File

@@ -968,6 +968,42 @@ func TestJetStreamClusterRestoreSingleConsumer(t *testing.T) {
}
}
func TestJetStreamClusterMaxBytesForStream(t *testing.T) {
// Has max_file_store of 2GB
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
info, err := js.AccountInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Make sure we still are dynamic.
if info.Limits.MaxStore != -1 || info.Limits.MaxMemory != -1 {
t.Fatalf("Expected dynamic limits for the account, got %+v\n", info.Limits)
}
// Stream config.
cfg := &nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 2,
}
// 2GB
cfg.MaxBytes = 2 * 1024 * 1024 * 1024
if _, err := js.AddStream(cfg); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Make sure going over the single server limit though is enforced (for now).
cfg.MaxBytes *= 2
_, err = js.AddStream(cfg)
if err == nil || !strings.Contains(err.Error(), "insufficient storage resources") {
t.Fatalf("Expected %q error, got %q", "insufficient storage resources", err.Error())
}
}
func TestJetStreamClusterStreamPublishWithActiveConsumers(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

View File

@@ -2382,7 +2382,7 @@ func (s *Server) raftNodeToClusterInfo(node RaftNode) *ClusterInfo {
}
peers := node.Peers()
peerList := make([]string, len(peers))
for i, p := range node.Peers() {
for i, p := range peers {
peerList[i] = p.ID
}
group := &raftGroup{