diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index f7d82926..6e008142 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1603,6 +1603,16 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor return errors.New("shutting down") } + // Check here to see if we have a max HA Assets limit set. + if maxHaAssets := s.getOpts().JetStreamLimits.MaxHAAssets; maxHaAssets > 0 { + if s.numRaftNodes() > maxHaAssets { + s.Warnf("Maximum HA Assets limit reached: %d", maxHaAssets) + // Since the meta leader assigned this, send a statsz update to them to get them up to date. + go s.sendStatszUpdate() + return errors.New("system limit reached") + } + } + storeDir := filepath.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, rg.Name) var store StreamStore if storage == FileStorage { @@ -4822,7 +4832,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo err.noStorage = true continue } - // HAAssets contain _meta_ which we want to ignore + // HAAssets contain _meta_ which we want to ignore, hence > and not >=. if maxHaAssets > 0 && ni.stats != nil && ni.stats.HAAssets > maxHaAssets { s.Warnf("Peer selection: discard %s@%s (HA Asset Count: %d) exceeds max ha asset limit of %d for stream placement", ni.name, ni.cluster, ni.stats.HAAssets, maxHaAssets) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 5843234c..7ec223d2 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -1281,3 +1281,37 @@ func TestJetStreamClusterScaleDownWhileNoQuorum(t *testing.T) { checkClusterFormed(t, c.servers...) c.waitOnStreamLeader(globalAccountName, "TEST") } + +// We noticed that ha_assets enforcement seemed to not be upheld when assets created in a rapid fashion. +func TestJetStreamClusterHAssetsEnforcement(t *testing.T) { + tmpl := strings.Replace(jsClusterTempl, "store_dir:", "limits: {max_ha_assets: 2}, store_dir:", 1) + c := createJetStreamClusterWithTemplateAndModHook(t, tmpl, "R3S", 3, nil) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST-1", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST-2", + Subjects: []string{"bar"}, + Replicas: 3, + }) + require_NoError(t, err) + + exceededErrs := []error{errors.New("system limit reached"), errors.New("no suitable peers")} + + // Should fail. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST-3", + Subjects: []string{"baz"}, + Replicas: 3, + }) + require_Error(t, err, exceededErrs...) +} diff --git a/server/test_test.go b/server/test_test.go index 8b757193..9aa46522 100644 --- a/server/test_test.go +++ b/server/test_test.go @@ -106,11 +106,11 @@ func require_Error(t *testing.T, err error, expected ...error) { } for _, e := range expected { - if err == e || strings.Contains(e.Error(), eStr) { + if err == e || strings.Contains(eStr, e.Error()) || strings.Contains(e.Error(), eStr) { return } } - t.Fatalf("Expected one of %+v, got '%v'", expected, err) + t.Fatalf("Expected one of %v, got '%v'", expected, err) } func require_Equal(t *testing.T, a, b string) {