From f7cb5b1f0d8802655b789042188db0679a1094d3 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Thu, 8 Sep 2022 18:25:48 -0700 Subject: [PATCH] changed format of JSClusterNoPeers error (#3459) * changed format of JSClusterNoPeers error This error was introduced in #3342 and reveals to much information This change gets rid of cluster names and peer counts. All other counts where changed to booleans, which are only included in the output when the filter was hit. In addition, the set of not matching tags is included. Furthermore, the static error description in server/errors.json is moved into selectPeerError sample errors: 1) no suitable peers for placement, tags not matched ['cloud:GCP', 'country:US']" 2) no suitable peers for placement, insufficient storage Signed-off-by: Matthias Hanel Signed-off-by: Ivan Kozlovic Co-authored-by: Ivan Kozlovic --- server/errors.json | 6 +- server/jetstream_api.go | 9 +- server/jetstream_cluster.go | 115 +++++++++++++++++-------- server/jetstream_cluster_test.go | 10 +-- server/jetstream_errors_generated.go | 6 +- server/jetstream_super_cluster_test.go | 11 +-- 6 files changed, 98 insertions(+), 59 deletions(-) diff --git a/server/errors.json b/server/errors.json index cf464f62..e9cdc17d 100644 --- a/server/errors.json +++ b/server/errors.json @@ -83,8 +83,8 @@ "constant": "JSClusterNoPeersErrF", "code": 400, "error_code": 10005, - "description": "no suitable peers for placement: {err}", - "comment": "", + "description": "{err}", + "comment": "Error causing no peers to be available", "help": "", "url": "", "deprecates": "" @@ -1299,4 +1299,4 @@ "url": "", "deprecates": "" } -] +] \ No newline at end of file diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 22954a7b..0be83e5f 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2395,18 +2395,19 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ } return true }) - errs := selectPeerErrors{e} + errs := &selectPeerError{} + errs.accumulate(e) for cluster := range clusters { - newPeers, _ := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil, 0) + newPeers, e := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil, 0) if len(newPeers) >= cfg.Replicas { peers = append([]string{}, currPeers...) peers = append(peers, newPeers[:cfg.Replicas]...) break } - errs = append(errs, e) + errs.accumulate(e) } if peers == nil { - resp.Error = NewJSClusterNoPeersError(&errs) + resp.Error = NewJSClusterNoPeersError(errs) s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index fe13ac27..ae8d87ad 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4544,43 +4544,80 @@ func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePe } type selectPeerError struct { - cluster string - clusterPeers int - offline int - excludeTag int - noTagMatch int - noStorage int - uniqueTag int - misc int + excludeTag bool + offline bool + noStorage bool + uniqueTag bool + misc bool + noJsClust bool + noMatchTags map[string]struct{} } func (e *selectPeerError) Error() string { - return fmt.Sprintf(`peer selection cluster '%s' with %d peers -offline: %d -excludeTag: %d -noTagMatch: %d -noStorage: %d -uniqueTag: %d -misc: %d -`, - e.cluster, e.clusterPeers, e.offline, e.excludeTag, e.noTagMatch, e.noStorage, e.uniqueTag, e.misc) + b := strings.Builder{} + writeBoolErrReason := func(hasErr bool, errMsg string) { + if !hasErr { + return + } + b.WriteString(", ") + b.WriteString(errMsg) + } + b.WriteString("no suitable peers for placement") + writeBoolErrReason(e.offline, "peer offline") + writeBoolErrReason(e.excludeTag, "exclude tag set") + writeBoolErrReason(e.noStorage, "insufficient storage") + writeBoolErrReason(e.uniqueTag, "server tag not unique") + writeBoolErrReason(e.misc, "miscellaneous issue") + writeBoolErrReason(e.noJsClust, "jetstream not enabled in cluster") + if len(e.noMatchTags) != 0 { + b.WriteString(", tags not matched [") + var firstTagWritten bool + for tag := range e.noMatchTags { + if firstTagWritten { + b.WriteString(", ") + } + firstTagWritten = true + b.WriteRune('\'') + b.WriteString(tag) + b.WriteRune('\'') + } + b.WriteString("]") + } + return b.String() } -type selectPeerErrors []*selectPeerError - -func (e *selectPeerErrors) Error() string { - errors := make([]string, len(*e)) - for i, err := range *e { - errors[i] = err.Error() +func (e *selectPeerError) addMissingTag(t string) { + if e.noMatchTags == nil { + e.noMatchTags = map[string]struct{}{} + } + e.noMatchTags[t] = struct{}{} +} + +func (e *selectPeerError) accumulate(eAdd *selectPeerError) { + if eAdd == nil { + return + } + acc := func(val *bool, valAdd bool) { + if valAdd { + *val = valAdd + } + } + acc(&e.offline, eAdd.offline) + acc(&e.excludeTag, eAdd.excludeTag) + acc(&e.noStorage, eAdd.noStorage) + acc(&e.uniqueTag, eAdd.uniqueTag) + acc(&e.misc, eAdd.misc) + acc(&e.noJsClust, eAdd.noJsClust) + for tag := range eAdd.noMatchTags { + e.addMissingTag(tag) } - return strings.Join(errors, "\n") } // selectPeerGroup will select a group of peers to start a raft group. // when peers exist already the unique tag prefix check for the replaceFirstExisting will be skipped func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig, existing []string, replaceFirstExisting int) ([]string, *selectPeerError) { if cluster == _EMPTY_ || cfg == nil { - return nil, &selectPeerError{cluster: cluster, misc: 1} + return nil, &selectPeerError{misc: true} } var maxBytes uint64 @@ -4658,14 +4695,14 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo // An error is a result of multiple individual placement decisions. // Which is why we keep taps on how often which one happened. - err := selectPeerError{cluster: cluster} + err := selectPeerError{} // Shuffle them up. rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) for _, p := range peers { si, ok := s.nodeToInfo.Load(p.ID) if !ok || si == nil { - err.misc++ + err.misc = true continue } ni := si.(nodeInfo) @@ -4674,12 +4711,11 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo s.Debugf("Peer selection: discard %s@%s reason: not target cluster %s", ni.name, ni.cluster, cluster) continue } - err.clusterPeers++ // If we know its offline or we do not have config or err don't consider. if ni.offline || ni.cfg == nil || ni.stats == nil { s.Debugf("Peer selection: discard %s@%s reason: offline", ni.name, ni.cluster) - err.offline++ + err.offline = true continue } @@ -4693,7 +4729,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo if ni.tags.Contains(jsExcludePlacement) { s.Debugf("Peer selection: discard %s@%s tags: %v reason: %s present", ni.name, ni.cluster, ni.tags, jsExcludePlacement) - err.excludeTag++ + err.excludeTag = true continue } @@ -4704,11 +4740,11 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo matched = false s.Debugf("Peer selection: discard %s@%s tags: %v reason: mandatory tag %s not present", ni.name, ni.cluster, ni.tags, t) + err.addMissingTag(t) break } } if !matched { - err.noTagMatch++ continue } } @@ -4741,14 +4777,14 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo if maxBytes > 0 && maxBytes > available { s.Warnf("Peer selection: discard %s@%s (Max Bytes: %d) exceeds available %s storage of %d bytes", ni.name, ni.cluster, maxBytes, cfg.Storage.String(), available) - err.noStorage++ + err.noStorage = true continue } // HAAssets contain _meta_ which we want to ignore if maxHaAssets > 0 && ni.stats != nil && ni.stats.HAAssets > maxHaAssets { s.Warnf("Peer selection: discard %s@%s (HA Asset Count: %d) exceeds max ha asset limit of %d for stream placement", ni.name, ni.cluster, ni.stats.HAAssets, maxHaAssets) - err.misc++ + err.misc = true continue } @@ -4761,7 +4797,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo s.Debugf("Peer selection: discard %s@%s tags:%v reason: unique prefix %s not present", ni.name, ni.cluster, ni.tags) } - err.uniqueTag++ + err.uniqueTag = true continue } } @@ -4773,6 +4809,9 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo if len(nodes) < (r - len(existing)) { s.Debugf("Peer selection: required %d nodes but found %d (cluster: %s replica: %d existing: %v/%d peers: %d result-peers: %d err: %+v)", (r - len(existing)), len(nodes), cluster, r, existing, replaceFirstExisting, len(peers), len(nodes), err) + if len(peers) == 0 { + err.noJsClust = true + } return nil, &err } // Sort based on available from most to least. @@ -4838,7 +4877,7 @@ func tieredStreamAndReservationCount(asa map[string]*streamAssignment, tier stri // createGroupForStream will create a group for assignment for the stream. // Lock should be held. -func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) (*raftGroup, *selectPeerErrors) { +func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) (*raftGroup, *selectPeerError) { replicas := cfg.Replicas if replicas == 0 { replicas = 1 @@ -4857,16 +4896,16 @@ func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) (*r } // Need to create a group here. - errFirst := selectPeerErrors{} + errs := &selectPeerError{} for _, cn := range clusters { peers, err := cc.selectPeerGroup(replicas, cn, cfg, nil, 0) if len(peers) < replicas { - errFirst = append(errFirst, err) + errs.accumulate(err) continue } return &raftGroup{Name: groupNameForStream(peers, cfg.Storage), Storage: cfg.Storage, Peers: peers, Cluster: cn}, nil } - return nil, &errFirst + return nil, errs } func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, string, *jsAccount, *ApiError) { diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index a914bb07..b4de6873 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -148,7 +148,7 @@ func TestJetStreamClusterStreamLimitWithAccountDefaults(t *testing.T) { Replicas: 2, MaxBytes: 15 * 1024 * 1024, }) - require_Contains(t, err.Error(), "no suitable peers for placement") + require_Contains(t, err.Error(), "no suitable peers for placement", "insufficient storage") } func TestJetStreamClusterSingleReplicaStreams(t *testing.T) { @@ -3644,8 +3644,7 @@ func TestJetStreamClusterPeerExclusionTag(t *testing.T) { c.Subjects = []string{c.Name} _, err := js.AddStream(&c) require_Error(t, err) - require_Contains(t, err.Error(), "no suitable peers for placement", "3 peers", - "excludeTag: 1", "offline: 0", "uniqueTag: 0") + require_Contains(t, err.Error(), "no suitable peers for placement", "exclude tag set") } // Test update failure @@ -3656,8 +3655,7 @@ func TestJetStreamClusterPeerExclusionTag(t *testing.T) { cfg.Replicas = 3 _, err = js.UpdateStream(cfg) require_Error(t, err) - require_Contains(t, err.Error(), "no suitable peers for placement", "3 peers", - "excludeTag: 1", "offline: 0", "uniqueTag: 0") + require_Contains(t, err.Error(), "no suitable peers for placement", "exclude tag set") // Test tag reload removing !jetstream tag, and allowing placement again srv := c.serverByName("S-1") @@ -9579,7 +9577,7 @@ func TestJetStreamClusterBalancedPlacement(t *testing.T) { Replicas: 2, MaxBytes: 1 * 1024 * 1024 * 1024, }) - require_Contains(t, err.Error(), "no suitable peers for placement") + require_Contains(t, err.Error(), "no suitable peers for placement", "insufficient storage") } func TestJetStreamClusterConsumerPendingBug(t *testing.T) { diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index cd8cb5eb..b6067308 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -14,7 +14,7 @@ const ( // JSClusterIncompleteErr incomplete results JSClusterIncompleteErr ErrorIdentifier = 10004 - // JSClusterNoPeersErrF no suitable peers for placement: {err} + // JSClusterNoPeersErrF Error causing no peers to be available ({err}) JSClusterNoPeersErrF ErrorIdentifier = 10005 // JSClusterNotActiveErr JetStream not in clustered mode @@ -401,7 +401,7 @@ var ( JSAccountResourcesExceededErr: {Code: 400, ErrCode: 10002, Description: "resource limits exceeded for account"}, JSBadRequestErr: {Code: 400, ErrCode: 10003, Description: "bad request"}, JSClusterIncompleteErr: {Code: 503, ErrCode: 10004, Description: "incomplete results"}, - JSClusterNoPeersErrF: {Code: 400, ErrCode: 10005, Description: "no suitable peers for placement: {err}"}, + JSClusterNoPeersErrF: {Code: 400, ErrCode: 10005, Description: "{err}"}, JSClusterNotActiveErr: {Code: 500, ErrCode: 10006, Description: "JetStream not in clustered mode"}, JSClusterNotAssignedErr: {Code: 500, ErrCode: 10007, Description: "JetStream cluster not assigned to this server"}, JSClusterNotAvailErr: {Code: 503, ErrCode: 10008, Description: "JetStream system temporarily unavailable"}, @@ -583,7 +583,7 @@ func NewJSClusterIncompleteError(opts ...ErrorOption) *ApiError { return ApiErrors[JSClusterIncompleteErr] } -// NewJSClusterNoPeersError creates a new JSClusterNoPeersErrF error: "no suitable peers for placement: {err}" +// NewJSClusterNoPeersError creates a new JSClusterNoPeersErrF error: "{err}" func NewJSClusterNoPeersError(err error, opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) if ae, ok := eopts.err.(*ApiError); ok { diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index d56a08d1..e31ce16d 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -79,7 +79,7 @@ func TestJetStreamSuperClusterMetaPlacement(t *testing.T) { // Make sure we get correct errors for tags and bad or unavailable cluster placement. sdr := stepdown("C22") - if sdr.Error == nil || !strings.Contains(sdr.Error.Description, "no suitable peers") { + if sdr.Error == nil || !strings.Contains(sdr.Error.Description, "no replacement peer connected") { t.Fatalf("Got incorrect error result: %+v", sdr.Error) } // Should work. @@ -184,7 +184,7 @@ func TestJetStreamSuperClusterUniquePlacementTag(t *testing.T) { si, err := js.AddStream(&nats.StreamConfig{Name: name, Replicas: test.replicas, Placement: test.placement}) if test.fail { require_Error(t, err) - require_Contains(t, err.Error(), "no suitable peers for placement") + require_Contains(t, err.Error(), "no suitable peers for placement", "server tag not unique") return } require_NoError(t, err) @@ -1503,7 +1503,8 @@ func TestJetStreamSuperClusterStreamTagPlacement(t *testing.T) { Subjects: []string{"foo"}, Placement: &nats.Placement{Tags: tags}, }) - require_Contains(t, err.Error(), "no suitable peers for placement") + require_Contains(t, err.Error(), "no suitable peers for placement", "tags not matched") + require_Contains(t, err.Error(), tags...) } placeErr("C1", []string{"cloud:GCP", "country:US"}) @@ -2393,7 +2394,7 @@ func TestJetStreamSuperClusterMaxHaAssets(t *testing.T) { _, err = js.AddStream(&nats.StreamConfig{Name: "S3", Replicas: 3, Placement: &nats.Placement{Cluster: "C1"}}) require_Error(t, err) require_Contains(t, err.Error(), "nats: no suitable peers for placement") - require_Contains(t, err.Error(), "misc: 3") + require_Contains(t, err.Error(), "miscellaneous issue") require_NoError(t, js.DeleteStream("S1")) waitStatsz(3, 2) waitStatsz(3, 1) @@ -2427,7 +2428,7 @@ func TestJetStreamSuperClusterMaxHaAssets(t *testing.T) { _, err = js.UpdateStream(&nats.StreamConfig{Name: "S2", Replicas: 3, Placement: &nats.Placement{Cluster: "C2"}}) require_Error(t, err) require_Contains(t, err.Error(), "nats: no suitable peers for placement") - require_Contains(t, err.Error(), "misc: 3") + require_Contains(t, err.Error(), "miscellaneous issue") } func TestJetStreamSuperClusterStreamAlternates(t *testing.T) {