From 52c4872666a26acdf475cb89ae9b5f367324fadd Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Sat, 6 Aug 2022 00:17:01 +0200 Subject: [PATCH] better error when peer selection fails (#3342) * better error when peer selection fails It is pretty hard to diagnose what went wrong when not enough peers for an operation where found. This change now returns counts of reasons why peers where discarded. Changed the error to JSClusterNoPeers as it seems more appropriate of an error for that operation. Not having enough resources is one of the conditions for a peer not being considered. But so is having a non matching tag. Which is why JSClusterNoPeers seems more appropriate In addition, JSClusterNoPeers was already used as error after one call to selectPeerGroup already. example: no suitable peers for placement: peer selection cluster 'C' with 3 peers offline: 0 excludeTag: 1 noTagMatch: 2 noSpace: 0 uniqueTag: 0 misc: 0 Examle for mqtt: mid:12 - "mqtt" - unable to connect: create sessions stream for account "$G": no suitable peers for placement: peer selection cluster 'MQTT' with 3 peers offline: 0 excludeTag: 0 noTagMatch: 0 noSpace: 0 uniqueTag: 0 misc: 0 (10005) Signed-off-by: Matthias Hanel * review comment Signed-off-by: Matthias Hanel --- server/errors.json | 4 +- server/jetstream_api.go | 10 +- server/jetstream_cluster.go | 144 ++++++++++++++++++------- server/jetstream_cluster_test.go | 13 +-- server/jetstream_errors_generated.go | 18 ++-- server/jetstream_leafnode_test.go | 4 +- server/jetstream_super_cluster_test.go | 17 +-- server/mqtt_test.go | 5 +- 8 files changed, 148 insertions(+), 67 deletions(-) diff --git a/server/errors.json b/server/errors.json index 530f6e70..5730766e 100644 --- a/server/errors.json +++ b/server/errors.json @@ -80,10 +80,10 @@ "deprecates": "" }, { - "constant": "JSClusterNoPeersErr", + "constant": "JSClusterNoPeersErrF", "code": 400, "error_code": 10005, - "description": "no suitable peers for placement", + "description": "no suitable peers for placement: {err}", "comment": "", "help": "", "url": "", diff --git a/server/jetstream_api.go b/server/jetstream_api.go index c8f0921c..80ae12a5 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2357,7 +2357,7 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ cfg.Placement.Tags = append(cfg.Placement.Tags, req.Tags...) } - peers := cc.selectPeerGroup(cfg.Replicas+1, currCluster, &cfg, currPeers, 1) + peers, e := cc.selectPeerGroup(cfg.Replicas+1, currCluster, &cfg, currPeers, 1) if len(peers) <= cfg.Replicas { // since expanding in the same cluster did not yield a result, try in different cluster peers = nil @@ -2369,16 +2369,18 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ } return true }) + errs := selectPeerErrors{e} for cluster := range clusters { - newPeers := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil, 0) + newPeers, _ := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil, 0) if len(newPeers) >= cfg.Replicas { peers = append([]string{}, currPeers...) peers = append(peers, newPeers[:cfg.Replicas]...) break } + errs = append(errs, e) } if peers == nil { - resp.Error = NewJSClusterNoPeersError() + resp.Error = NewJSClusterNoPeersError(&errs) s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } @@ -2619,7 +2621,7 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun } } if len(peers) == 0 { - resp.Error = NewJSClusterNoPeersError() + resp.Error = NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected")) s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index bc5d23f0..a47dc1ac 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4245,7 +4245,9 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client if cfg.Placement == nil || cfg.Placement.Cluster == _EMPTY_ { // If we have additional clusters to try we can retry. if ci != nil && len(ci.Alternates) > 0 { - if rg := js.createGroupForStream(ci, cfg); rg != nil { + if rg, err := js.createGroupForStream(ci, cfg); err != nil { + s.Warnf("Retrying cluster placement for stream '%s > %s' failed due to placement error: %+v", result.Account, result.Stream, err) + } else { if org := sa.Group; org != nil && len(org.Peers) > 0 { s.Warnf("Retrying cluster placement for stream '%s > %s' due to insufficient resources in cluster %q", result.Account, result.Stream, s.clusterNameForNode(org.Peers[0])) @@ -4476,11 +4478,44 @@ func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePe return false } +type selectPeerError struct { + cluster string + clusterPeers int + offline int + excludeTag int + noTagMatch int + noStorage int + uniqueTag int + misc int +} + +func (e *selectPeerError) Error() string { + return fmt.Sprintf(`peer selection cluster '%s' with %d peers +offline: %d +excludeTag: %d +noTagMatch: %d +noStorage: %d +uniqueTag: %d +misc: %d +`, + e.cluster, e.clusterPeers, e.offline, e.excludeTag, e.noTagMatch, e.noStorage, e.uniqueTag, e.misc) +} + +type selectPeerErrors []*selectPeerError + +func (e *selectPeerErrors) Error() string { + errors := make([]string, len(*e)) + for i, err := range *e { + errors[i] = err.Error() + } + return strings.Join(errors, "\n") +} + // selectPeerGroup will select a group of peers to start a raft group. // when peers exist already the unique tag prefix check for the replaceFirstExisting will be skipped -func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig, existing []string, replaceFirstExisting int) (re []string) { +func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig, existing []string, replaceFirstExisting int) ([]string, *selectPeerError) { if cluster == _EMPTY_ || cfg == nil { - return nil + return nil, &selectPeerError{cluster: cluster, misc: 1} } var maxBytes uint64 @@ -4515,28 +4550,28 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo } } } - var uniqueTags = make(map[string]struct{}) + var uniqueTags = make(map[string]*nodeInfo) - checkUniqueTag := func(ni *nodeInfo) bool { - // default requires the unique prefix to be present - isUnique := false + checkUniqueTag := func(ni *nodeInfo) (bool, *nodeInfo) { for _, t := range ni.tags { if strings.HasPrefix(t, uniqueTagPrefix) { - if _, ok := uniqueTags[t]; !ok { - uniqueTags[t] = struct{}{} - isUnique = true + if n, ok := uniqueTags[t]; !ok { + uniqueTags[t] = ni + return true, ni + } else { + return false, n } - break } } - return isUnique + // default requires the unique prefix to be present + return false, nil } // Map existing. var ep map[string]struct{} if le := len(existing); le > 0 { if le >= r { - return existing + return existing[:r], nil } ep = make(map[string]struct{}) for i, p := range existing { @@ -4556,17 +4591,30 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo maxHaAssets := s.getOpts().JetStreamLimits.MaxHAAssets + // An error is a result of multiple individual placement decisions. + // Which is why we keep taps on how often which one happened. + err := selectPeerError{cluster: cluster} + // Shuffle them up. rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) for _, p := range peers { si, ok := s.nodeToInfo.Load(p.ID) if !ok || si == nil { + err.misc++ continue } ni := si.(nodeInfo) // Only select from the designated named cluster. - // If we know its offline or we do not have config or stats don't consider. - if ni.cluster != cluster || ni.offline || ni.cfg == nil || ni.stats == nil { + if ni.cluster != cluster { + s.Debugf("Peer selection: discard %s@%s reason: not target cluster %s", ni.name, ni.cluster, cluster) + continue + } + err.clusterPeers++ + + // If we know its offline or we do not have config or err don't consider. + if ni.offline || ni.cfg == nil || ni.stats == nil { + s.Debugf("Peer selection: discard %s@%s reason: offline", ni.name, ni.cluster) + err.offline++ continue } @@ -4578,6 +4626,9 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo } if ni.tags.Contains(jsExcludePlacement) { + s.Debugf("Peer selection: discard %s@%s tags: %v reason: %s present", + ni.name, ni.cluster, ni.tags, jsExcludePlacement) + err.excludeTag++ continue } @@ -4586,10 +4637,13 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo for _, t := range tags { if !ni.tags.Contains(t) { matched = false + s.Debugf("Peer selection: discard %s@%s tags: %v reason: mandatory tag %s not present", + ni.name, ni.cluster, ni.tags, t) break } } if !matched { + err.noTagMatch++ continue } } @@ -4620,19 +4674,31 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo // Otherwise check if we have enough room if maxBytes set. if maxBytes > 0 && maxBytes > available { - s.Warnf("%s@%s (Max Bytes: %d) exceeds available %s storage of %d bytes", + s.Warnf("Peer selection: discard %s@%s (Max Bytes: %d) exceeds available %s storage of %d bytes", ni.name, ni.cluster, maxBytes, cfg.Storage.String(), available) + err.noStorage++ continue } // HAAssets contain _meta_ which we want to ignore if maxHaAssets > 0 && ni.stats != nil && ni.stats.HAAssets > maxHaAssets { - s.Warnf("%s@%s (HA Asset Count: %d) exceeds max ha asset limit of %d for stream placement", + s.Warnf("Peer selection: discard %s@%s (HA Asset Count: %d) exceeds max ha asset limit of %d for stream placement", ni.name, ni.cluster, ni.stats.HAAssets, maxHaAssets) + err.misc++ continue } - if uniqueTagPrefix != _EMPTY_ && !checkUniqueTag(&ni) { - continue + if uniqueTagPrefix != _EMPTY_ { + if unique, owner := checkUniqueTag(&ni); !unique { + if owner != nil { + s.Debugf("Peer selection: discard %s@%s tags:%v reason: unique prefix %s owned by %s@%s", + ni.name, ni.cluster, ni.tags, owner.name, owner.cluster) + } else { + s.Debugf("Peer selection: discard %s@%s tags:%v reason: unique prefix %s not present", + ni.name, ni.cluster, ni.tags) + } + err.uniqueTag++ + continue + } } // Add to our list of potential nodes. nodes = append(nodes, wn{p.ID, available, ha}) @@ -4640,7 +4706,9 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo // If we could not select enough peers, fail. if len(nodes) < (r - len(existing)) { - return nil + s.Debugf("Peer selection: required %d nodes but found %d (cluster: %s replica: %d existing: %v/%d peers: %d result-peers: %d err: %+v)", + (r - len(existing)), len(nodes), cluster, r, existing, replaceFirstExisting, len(peers), len(nodes), err) + return nil, &err } // Sort based on available from most to least. sort.Slice(nodes, func(i, j int) bool { return nodes[i].avail > nodes[j].avail }) @@ -4658,7 +4726,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo for _, r := range nodes[:r] { results = append(results, r.id) } - return results + return results, nil } func groupNameForStream(peers []string, storage StorageType) string { @@ -4705,7 +4773,7 @@ func tieredStreamAndReservationCount(asa map[string]*streamAssignment, tier stri // createGroupForStream will create a group for assignment for the stream. // Lock should be held. -func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) *raftGroup { +func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) (*raftGroup, *selectPeerErrors) { replicas := cfg.Replicas if replicas == 0 { replicas = 1 @@ -4724,14 +4792,16 @@ func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) *ra } // Need to create a group here. + errFirst := selectPeerErrors{} for _, cn := range clusters { - peers := cc.selectPeerGroup(replicas, cn, cfg, nil, 0) + peers, err := cc.selectPeerGroup(replicas, cn, cfg, nil, 0) if len(peers) < replicas { + errFirst = append(errFirst, err) continue } - return &raftGroup{Name: groupNameForStream(peers, cfg.Storage), Storage: cfg.Storage, Peers: peers, Cluster: cn} + return &raftGroup{Name: groupNameForStream(peers, cfg.Storage), Storage: cfg.Storage, Peers: peers, Cluster: cn}, nil } - return nil + return nil, &errFirst } func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, string, *jsAccount, *ApiError) { @@ -4847,9 +4917,9 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, } // Raft group selection and placement. - rg := js.createGroupForStream(ci, cfg) - if rg == nil { - resp.Error = NewJSInsufficientResourcesError() + rg, err := js.createGroupForStream(ci, cfg) + if err != nil { + resp.Error = NewJSClusterNoPeersError(err) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } @@ -5016,9 +5086,9 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su if isReplicaChange { // We are adding new peers here. if newCfg.Replicas > len(rg.Peers) { - peers := cc.selectPeerGroup(newCfg.Replicas, rg.Cluster, newCfg, rg.Peers, 0) - if len(peers) != newCfg.Replicas { - resp.Error = NewJSInsufficientResourcesError() + peers, err := cc.selectPeerGroup(newCfg.Replicas, rg.Cluster, newCfg, rg.Peers, 0) + if err != nil { + resp.Error = NewJSClusterNoPeersError(err) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } @@ -5089,9 +5159,9 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su } } else if isMoveRequest { if len(peerSet) == 0 { - nrg := js.createGroupForStream(ci, newCfg) - if nrg == nil { - resp.Error = NewJSInsufficientResourcesError() + nrg, err := js.createGroupForStream(ci, newCfg) + if err != nil { + resp.Error = NewJSClusterNoPeersError(err) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } @@ -5275,9 +5345,9 @@ func (s *Server) jsClusteredStreamRestoreRequest( } // Raft group selection and placement. - rg := js.createGroupForStream(ci, cfg) - if rg == nil { - resp.Error = NewJSInsufficientResourcesError() + rg, err := js.createGroupForStream(ci, cfg) + if err != nil { + resp.Error = NewJSClusterNoPeersError(err) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 5f91d74b..b7e60f87 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -148,7 +148,7 @@ func TestJetStreamClusterStreamLimitWithAccountDefaults(t *testing.T) { Replicas: 2, MaxBytes: 15 * 1024 * 1024, }) - require_Error(t, err, NewJSInsufficientResourcesError(), NewJSStorageResourcesExceededError()) + require_Contains(t, err.Error(), "no suitable peers for placement") } func TestJetStreamClusterSingleReplicaStreams(t *testing.T) { @@ -1047,7 +1047,7 @@ func TestJetStreamClusterMaxBytesForStream(t *testing.T) { cfg.Name = "TEST2" cfg.MaxBytes *= 2 _, err = js.AddStream(cfg) - require_Error(t, err, NewJSInsufficientResourcesError(), NewJSStorageResourcesExceededError()) + require_Contains(t, err.Error(), "no suitable peers for placement") } func TestJetStreamClusterStreamPublishWithActiveConsumers(t *testing.T) { @@ -3634,7 +3634,8 @@ func TestJetStreamClusterPeerExclusionTag(t *testing.T) { c.Subjects = []string{c.Name} _, err := js.AddStream(&c) require_Error(t, err) - require_Contains(t, err.Error(), "insufficient resources") + require_Contains(t, err.Error(), "no suitable peers for placement", "3 peers", + "excludeTag: 1", "offline: 0", "uniqueTag: 0") } // Test update failure @@ -3645,8 +3646,8 @@ func TestJetStreamClusterPeerExclusionTag(t *testing.T) { cfg.Replicas = 3 _, err = js.UpdateStream(cfg) require_Error(t, err) - require_Contains(t, err.Error(), "insufficient resources") - + require_Contains(t, err.Error(), "no suitable peers for placement", "3 peers", + "excludeTag: 1", "offline: 0", "uniqueTag: 0") // Test tag reload removing !jetstream tag, and allowing placement again srv := c.serverByName("S-1") @@ -9398,7 +9399,7 @@ func TestJetStreamClusterBalancedPlacement(t *testing.T) { Replicas: 2, MaxBytes: 1 * 1024 * 1024 * 1024, }) - require_Error(t, err, NewJSInsufficientResourcesError(), NewJSStorageResourcesExceededError()) + require_Contains(t, err.Error(), "no suitable peers for placement") } func TestJetStreamClusterConsumerPendingBug(t *testing.T) { diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index c5de171f..ce9f62ac 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -14,8 +14,8 @@ const ( // JSClusterIncompleteErr incomplete results JSClusterIncompleteErr ErrorIdentifier = 10004 - // JSClusterNoPeersErr no suitable peers for placement - JSClusterNoPeersErr ErrorIdentifier = 10005 + // JSClusterNoPeersErrF no suitable peers for placement: {err} + JSClusterNoPeersErrF ErrorIdentifier = 10005 // JSClusterNotActiveErr JetStream not in clustered mode JSClusterNotActiveErr ErrorIdentifier = 10006 @@ -398,7 +398,7 @@ var ( JSAccountResourcesExceededErr: {Code: 400, ErrCode: 10002, Description: "resource limits exceeded for account"}, JSBadRequestErr: {Code: 400, ErrCode: 10003, Description: "bad request"}, JSClusterIncompleteErr: {Code: 503, ErrCode: 10004, Description: "incomplete results"}, - JSClusterNoPeersErr: {Code: 400, ErrCode: 10005, Description: "no suitable peers for placement"}, + JSClusterNoPeersErrF: {Code: 400, ErrCode: 10005, Description: "no suitable peers for placement: {err}"}, JSClusterNotActiveErr: {Code: 500, ErrCode: 10006, Description: "JetStream not in clustered mode"}, JSClusterNotAssignedErr: {Code: 500, ErrCode: 10007, Description: "JetStream cluster not assigned to this server"}, JSClusterNotAvailErr: {Code: 503, ErrCode: 10008, Description: "JetStream system temporarily unavailable"}, @@ -579,14 +579,20 @@ func NewJSClusterIncompleteError(opts ...ErrorOption) *ApiError { return ApiErrors[JSClusterIncompleteErr] } -// NewJSClusterNoPeersError creates a new JSClusterNoPeersErr error: "no suitable peers for placement" -func NewJSClusterNoPeersError(opts ...ErrorOption) *ApiError { +// NewJSClusterNoPeersError creates a new JSClusterNoPeersErrF error: "no suitable peers for placement: {err}" +func NewJSClusterNoPeersError(err error, opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) if ae, ok := eopts.err.(*ApiError); ok { return ae } - return ApiErrors[JSClusterNoPeersErr] + e := ApiErrors[JSClusterNoPeersErrF] + args := e.toReplacerArgs([]interface{}{"{err}", err}) + return &ApiError{ + Code: e.Code, + ErrCode: e.ErrCode, + Description: strings.NewReplacer(args...).Replace(e.Description), + } } // NewJSClusterNotActiveError creates a new JSClusterNotActiveErr error: "JetStream not in clustered mode" diff --git a/server/jetstream_leafnode_test.go b/server/jetstream_leafnode_test.go index 2cf40072..5343bbb9 100644 --- a/server/jetstream_leafnode_test.go +++ b/server/jetstream_leafnode_test.go @@ -484,7 +484,7 @@ leafnodes:{ require_NoError(t, err) } else { require_Error(t, err) - require_Contains(t, err.Error(), "insufficient resources") + require_Contains(t, err.Error(), "no suitable peers for placement") } ncL := natsConnect(t, fmt.Sprintf("nats://a1:a1@127.0.0.1:%d", sLA.opts.Port)) defer ncL.Close() @@ -499,7 +499,7 @@ leafnodes:{ require_NoError(t, err) } else { require_Error(t, err) - require_Contains(t, err.Error(), "insufficient resources") + require_Contains(t, err.Error(), "no suitable peers for placement") } } clusterLnCnt := func(expected int) error { diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 3b1cc402..a4612620 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -180,7 +180,7 @@ func TestJetStreamSuperClusterUniquePlacementTag(t *testing.T) { si, err := js.AddStream(&nats.StreamConfig{Name: name, Replicas: test.replicas, Placement: test.placement}) if test.fail { require_Error(t, err) - require_Equal(t, err.Error(), "insufficient resources") + require_Contains(t, err.Error(), "no suitable peers for placement") return } require_NoError(t, err) @@ -848,8 +848,8 @@ func TestJetStreamSuperClusterLeafNodesWithSharedSystemAccountAndDifferentDomain Replicas: 2, Placement: &nats.Placement{Cluster: pcn}, }) - if err == nil || !strings.Contains(err.Error(), "insufficient resources") { - t.Fatalf("Expected insufficient resources, got: %v", err) + if err == nil || !strings.Contains(err.Error(), "no suitable peers for placement") { + t.Fatalf("Expected no suitable peers for placement, got: %v", err) } } @@ -1380,8 +1380,7 @@ func TestJetStreamSuperClusterOverflowPlacement(t *testing.T) { MaxBytes: 2 * 1024 * 1024 * 1024, Placement: &nats.Placement{Cluster: pcn}, }) - require_Error(t, err, NewJSInsufficientResourcesError(), NewJSStorageResourcesExceededError()) - + require_Contains(t, err.Error(), "no suitable peers for placement") // Now test actual overflow placement. So try again with no placement designation. // This will test the peer picker's logic since they are updated at this point and the meta leader // knows it can not place it in C2. @@ -1497,7 +1496,7 @@ func TestJetStreamSuperClusterStreamTagPlacement(t *testing.T) { Subjects: []string{"foo"}, Placement: &nats.Placement{Tags: tags}, }) - require_Error(t, err, NewJSInsufficientResourcesError()) + require_Contains(t, err.Error(), "no suitable peers for placement") } placeErr("C1", []string{"cloud:GCP", "country:US"}) @@ -2381,7 +2380,8 @@ func TestJetStreamSuperClusterMaxHaAssets(t *testing.T) { waitStatsz(3, 1) _, err = js.AddStream(&nats.StreamConfig{Name: "S3", Replicas: 3, Placement: &nats.Placement{Cluster: "C1"}}) require_Error(t, err) - require_Equal(t, err.Error(), "insufficient resources") + require_Contains(t, err.Error(), "no suitable peers for placement") + require_Contains(t, err.Error(), "misc: 3") require_NoError(t, js.DeleteStream("S1")) waitStatsz(3, 2) waitStatsz(3, 1) @@ -2414,7 +2414,8 @@ func TestJetStreamSuperClusterMaxHaAssets(t *testing.T) { require_Equal(t, err.Error(), "insufficient resources") _, err = js.UpdateStream(&nats.StreamConfig{Name: "S2", Replicas: 3, Placement: &nats.Placement{Cluster: "C2"}}) require_Error(t, err) - require_Equal(t, err.Error(), "insufficient resources") + require_Contains(t, err.Error(), "no suitable peers for placement") + require_Contains(t, err.Error(), "misc: 3") } func TestJetStreamSuperClusterStreamAlternates(t *testing.T) { diff --git a/server/mqtt_test.go b/server/mqtt_test.go index b2a1d6bd..3ecedc0e 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -18,6 +18,7 @@ import ( "bytes" "crypto/tls" "encoding/json" + "errors" "fmt" "io" "math/rand" @@ -5753,8 +5754,8 @@ func TestMQTTStreamReplicasInsufficientResources(t *testing.T) { select { case e := <-l.errCh: - if !strings.Contains(e, NewJSInsufficientResourcesError().Description) { - t.Fatalf("Expected error regarding insufficient resources, got %v", e) + if !strings.Contains(e, fmt.Sprintf("%d", NewJSClusterNoPeersError(errors.New("")).ErrCode)) { + t.Fatalf("Expected error regarding no peers error, got %v", e) } case <-time.After(2 * time.Second): t.Fatalf("Did not get the error regarding replicas count")