diff --git a/server/jetstream.go b/server/jetstream.go index 9c69fd7d..3ae418fb 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -811,13 +811,15 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { sendq := s.sys.sendq s.mu.Unlock() - js := s.getJetStream() + // No limits means we dynamically set up limits. + // We also place limits here so we know that the account is configured for JetStream. + if limits == nil { + limits = dynamicJSAccountLimits + } + js := s.getJetStream() if js == nil { - // Place limits here so we know that the account is configured for JetStream. - if limits == nil { - limits = dynamicJSAccountLimits - } + a.assignJetStreamLimits(limits) return ApiErrors[JSNotEnabledErr] } @@ -826,9 +828,8 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { return fmt.Errorf("jetstream can not be enabled on the system account") } - // No limits means we dynamically set up limits. if limits == nil { - limits = js.dynamicAccountLimits() + limits = dynamicJSAccountLimits } a.assignJetStreamLimits(limits) @@ -1139,7 +1140,7 @@ func (a *Account) UpdateJetStreamLimits(limits *JetStreamAccountLimits) error { } if limits == nil { - limits = js.dynamicAccountLimits() + limits = dynamicJSAccountLimits } // Calculate the delta between what we have and what we want. @@ -1441,7 +1442,7 @@ func (jsa *jsAccount) checkLimits(config *StreamConfig) error { // Check storage, memory or disk. if config.MaxBytes > 0 { - return jsa.checkBytesLimits(config.MaxBytes*int64(config.Replicas), config.Storage) + return jsa.checkBytesLimits(config.MaxBytes, config.Storage, config.Replicas) } return nil } @@ -1449,17 +1450,21 @@ func (jsa *jsAccount) checkLimits(config *StreamConfig) error { // Check if additional bytes will exceed our account limits. // This should account for replicas. // Lock should be held. -func (jsa *jsAccount) checkBytesLimits(addBytes int64, storage StorageType) error { +func (jsa *jsAccount) checkBytesLimits(addBytes int64, storage StorageType, replicas int) error { + if replicas < 1 { + replicas = 1 + } + js, totalBytes := jsa.js, addBytes*int64(replicas) + switch storage { case MemoryStorage: // Account limits defined. if jsa.limits.MaxMemory > 0 { - if jsa.memReserved+addBytes > jsa.limits.MaxMemory { + if jsa.memReserved+totalBytes > jsa.limits.MaxMemory { return ApiErrors[JSMemoryResourcesExceededErr] } } else { // Account is unlimited, check if this server can handle request. - js := jsa.js if js.memReserved+addBytes > js.config.MaxMemory { return ApiErrors[JSMemoryResourcesExceededErr] } @@ -1467,12 +1472,11 @@ func (jsa *jsAccount) checkBytesLimits(addBytes int64, storage StorageType) erro case FileStorage: // Account limits defined. if jsa.limits.MaxStore > 0 { - if jsa.storeReserved+addBytes > jsa.limits.MaxStore { + if jsa.storeReserved+totalBytes > jsa.limits.MaxStore { return ApiErrors[JSStorageResourcesExceededErr] } } else { // Account is unlimited, check if this server can handle request. - js := jsa.js if js.storeReserved+addBytes > js.config.MaxStore { return ApiErrors[JSStorageResourcesExceededErr] } @@ -1533,15 +1537,6 @@ func (js *jetStream) lookupAccount(a *Account) *jsAccount { return jsa } -// Will dynamically create limits for this account. -func (js *jetStream) dynamicAccountLimits() *JetStreamAccountLimits { - js.mu.RLock() - // For now use all resources. Mostly meant for $G in non-account mode. - limits := &JetStreamAccountLimits{js.config.MaxMemory, js.config.MaxStore, -1, -1} - js.mu.RUnlock() - return limits -} - // Report on JetStream stats and usage for this server. func (js *jetStream) usageStats() *JetStreamStats { var stats JetStreamStats diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 9a9936e4..2ecde582 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -968,6 +968,42 @@ func TestJetStreamClusterRestoreSingleConsumer(t *testing.T) { } } +func TestJetStreamClusterMaxBytesForStream(t *testing.T) { + // Has max_file_store of 2GB + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Client based API + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + info, err := js.AccountInfo() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // Make sure we still are dynamic. + if info.Limits.MaxStore != -1 || info.Limits.MaxMemory != -1 { + t.Fatalf("Expected dynamic limits for the account, got %+v\n", info.Limits) + } + // Stream config. + cfg := &nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar"}, + Replicas: 2, + } + // 2GB + cfg.MaxBytes = 2 * 1024 * 1024 * 1024 + if _, err := js.AddStream(cfg); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // Make sure going over the single server limit though is enforced (for now). + cfg.MaxBytes *= 2 + _, err = js.AddStream(cfg) + if err == nil || !strings.Contains(err.Error(), "insufficient storage resources") { + t.Fatalf("Expected %q error, got %q", "insufficient storage resources", err.Error()) + } +} + func TestJetStreamClusterStreamPublishWithActiveConsumers(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() diff --git a/server/monitor.go b/server/monitor.go index 0cce3420..9e27323b 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2382,7 +2382,7 @@ func (s *Server) raftNodeToClusterInfo(node RaftNode) *ClusterInfo { } peers := node.Peers() peerList := make([]string, len(peers)) - for i, p := range node.Peers() { + for i, p := range peers { peerList[i] = p.ID } group := &raftGroup{