diff --git a/server/errors.json b/server/errors.json index cf464f62..e9cdc17d 100644 --- a/server/errors.json +++ b/server/errors.json @@ -83,8 +83,8 @@ "constant": "JSClusterNoPeersErrF", "code": 400, "error_code": 10005, - "description": "no suitable peers for placement: {err}", - "comment": "", + "description": "{err}", + "comment": "Error causing no peers to be available", "help": "", "url": "", "deprecates": "" @@ -1299,4 +1299,4 @@ "url": "", "deprecates": "" } -] +] \ No newline at end of file diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 22954a7b..0be83e5f 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2395,18 +2395,19 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ } return true }) - errs := selectPeerErrors{e} + errs := &selectPeerError{} + errs.accumulate(e) for cluster := range clusters { - newPeers, _ := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil, 0) + newPeers, e := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil, 0) if len(newPeers) >= cfg.Replicas { peers = append([]string{}, currPeers...) peers = append(peers, newPeers[:cfg.Replicas]...) break } - errs = append(errs, e) + errs.accumulate(e) } if peers == nil { - resp.Error = NewJSClusterNoPeersError(&errs) + resp.Error = NewJSClusterNoPeersError(errs) s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index fe13ac27..ae8d87ad 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4544,43 +4544,80 @@ func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePe } type selectPeerError struct { - cluster string - clusterPeers int - offline int - excludeTag int - noTagMatch int - noStorage int - uniqueTag int - misc int + excludeTag bool + offline bool + noStorage bool + uniqueTag bool + misc bool + noJsClust bool + noMatchTags map[string]struct{} } func (e *selectPeerError) Error() string { - return fmt.Sprintf(`peer selection cluster '%s' with %d peers -offline: %d -excludeTag: %d -noTagMatch: %d -noStorage: %d -uniqueTag: %d -misc: %d -`, - e.cluster, e.clusterPeers, e.offline, e.excludeTag, e.noTagMatch, e.noStorage, e.uniqueTag, e.misc) + b := strings.Builder{} + writeBoolErrReason := func(hasErr bool, errMsg string) { + if !hasErr { + return + } + b.WriteString(", ") + b.WriteString(errMsg) + } + b.WriteString("no suitable peers for placement") + writeBoolErrReason(e.offline, "peer offline") + writeBoolErrReason(e.excludeTag, "exclude tag set") + writeBoolErrReason(e.noStorage, "insufficient storage") + writeBoolErrReason(e.uniqueTag, "server tag not unique") + writeBoolErrReason(e.misc, "miscellaneous issue") + writeBoolErrReason(e.noJsClust, "jetstream not enabled in cluster") + if len(e.noMatchTags) != 0 { + b.WriteString(", tags not matched [") + var firstTagWritten bool + for tag := range e.noMatchTags { + if firstTagWritten { + b.WriteString(", ") + } + firstTagWritten = true + b.WriteRune('\'') + b.WriteString(tag) + b.WriteRune('\'') + } + b.WriteString("]") + } + return b.String() } -type selectPeerErrors []*selectPeerError - -func (e *selectPeerErrors) Error() string { - errors := make([]string, len(*e)) - for i, err := range *e { - errors[i] = err.Error() +func (e *selectPeerError) addMissingTag(t string) { + if e.noMatchTags == nil { + e.noMatchTags = map[string]struct{}{} + } + e.noMatchTags[t] = struct{}{} +} + +func (e *selectPeerError) accumulate(eAdd *selectPeerError) { + if eAdd == nil { + return + } + acc := func(val *bool, valAdd bool) { + if valAdd { + *val = valAdd + } + } + acc(&e.offline, eAdd.offline) + acc(&e.excludeTag, eAdd.excludeTag) + acc(&e.noStorage, eAdd.noStorage) + acc(&e.uniqueTag, eAdd.uniqueTag) + acc(&e.misc, eAdd.misc) + acc(&e.noJsClust, eAdd.noJsClust) + for tag := range eAdd.noMatchTags { + e.addMissingTag(tag) } - return strings.Join(errors, "\n") } // selectPeerGroup will select a group of peers to start a raft group. // when peers exist already the unique tag prefix check for the replaceFirstExisting will be skipped func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig, existing []string, replaceFirstExisting int) ([]string, *selectPeerError) { if cluster == _EMPTY_ || cfg == nil { - return nil, &selectPeerError{cluster: cluster, misc: 1} + return nil, &selectPeerError{misc: true} } var maxBytes uint64 @@ -4658,14 +4695,14 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo // An error is a result of multiple individual placement decisions. // Which is why we keep taps on how often which one happened. - err := selectPeerError{cluster: cluster} + err := selectPeerError{} // Shuffle them up. rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) for _, p := range peers { si, ok := s.nodeToInfo.Load(p.ID) if !ok || si == nil { - err.misc++ + err.misc = true continue } ni := si.(nodeInfo) @@ -4674,12 +4711,11 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo s.Debugf("Peer selection: discard %s@%s reason: not target cluster %s", ni.name, ni.cluster, cluster) continue } - err.clusterPeers++ // If we know its offline or we do not have config or err don't consider. if ni.offline || ni.cfg == nil || ni.stats == nil { s.Debugf("Peer selection: discard %s@%s reason: offline", ni.name, ni.cluster) - err.offline++ + err.offline = true continue } @@ -4693,7 +4729,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo if ni.tags.Contains(jsExcludePlacement) { s.Debugf("Peer selection: discard %s@%s tags: %v reason: %s present", ni.name, ni.cluster, ni.tags, jsExcludePlacement) - err.excludeTag++ + err.excludeTag = true continue } @@ -4704,11 +4740,11 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo matched = false s.Debugf("Peer selection: discard %s@%s tags: %v reason: mandatory tag %s not present", ni.name, ni.cluster, ni.tags, t) + err.addMissingTag(t) break } } if !matched { - err.noTagMatch++ continue } } @@ -4741,14 +4777,14 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo if maxBytes > 0 && maxBytes > available { s.Warnf("Peer selection: discard %s@%s (Max Bytes: %d) exceeds available %s storage of %d bytes", ni.name, ni.cluster, maxBytes, cfg.Storage.String(), available) - err.noStorage++ + err.noStorage = true continue } // HAAssets contain _meta_ which we want to ignore if maxHaAssets > 0 && ni.stats != nil && ni.stats.HAAssets > maxHaAssets { s.Warnf("Peer selection: discard %s@%s (HA Asset Count: %d) exceeds max ha asset limit of %d for stream placement", ni.name, ni.cluster, ni.stats.HAAssets, maxHaAssets) - err.misc++ + err.misc = true continue } @@ -4761,7 +4797,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo s.Debugf("Peer selection: discard %s@%s tags:%v reason: unique prefix %s not present", ni.name, ni.cluster, ni.tags) } - err.uniqueTag++ + err.uniqueTag = true continue } } @@ -4773,6 +4809,9 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo if len(nodes) < (r - len(existing)) { s.Debugf("Peer selection: required %d nodes but found %d (cluster: %s replica: %d existing: %v/%d peers: %d result-peers: %d err: %+v)", (r - len(existing)), len(nodes), cluster, r, existing, replaceFirstExisting, len(peers), len(nodes), err) + if len(peers) == 0 { + err.noJsClust = true + } return nil, &err } // Sort based on available from most to least. @@ -4838,7 +4877,7 @@ func tieredStreamAndReservationCount(asa map[string]*streamAssignment, tier stri // createGroupForStream will create a group for assignment for the stream. // Lock should be held. -func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) (*raftGroup, *selectPeerErrors) { +func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) (*raftGroup, *selectPeerError) { replicas := cfg.Replicas if replicas == 0 { replicas = 1 @@ -4857,16 +4896,16 @@ func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) (*r } // Need to create a group here. - errFirst := selectPeerErrors{} + errs := &selectPeerError{} for _, cn := range clusters { peers, err := cc.selectPeerGroup(replicas, cn, cfg, nil, 0) if len(peers) < replicas { - errFirst = append(errFirst, err) + errs.accumulate(err) continue } return &raftGroup{Name: groupNameForStream(peers, cfg.Storage), Storage: cfg.Storage, Peers: peers, Cluster: cn}, nil } - return nil, &errFirst + return nil, errs } func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, string, *jsAccount, *ApiError) { diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index a914bb07..b4de6873 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -148,7 +148,7 @@ func TestJetStreamClusterStreamLimitWithAccountDefaults(t *testing.T) { Replicas: 2, MaxBytes: 15 * 1024 * 1024, }) - require_Contains(t, err.Error(), "no suitable peers for placement") + require_Contains(t, err.Error(), "no suitable peers for placement", "insufficient storage") } func TestJetStreamClusterSingleReplicaStreams(t *testing.T) { @@ -3644,8 +3644,7 @@ func TestJetStreamClusterPeerExclusionTag(t *testing.T) { c.Subjects = []string{c.Name} _, err := js.AddStream(&c) require_Error(t, err) - require_Contains(t, err.Error(), "no suitable peers for placement", "3 peers", - "excludeTag: 1", "offline: 0", "uniqueTag: 0") + require_Contains(t, err.Error(), "no suitable peers for placement", "exclude tag set") } // Test update failure @@ -3656,8 +3655,7 @@ func TestJetStreamClusterPeerExclusionTag(t *testing.T) { cfg.Replicas = 3 _, err = js.UpdateStream(cfg) require_Error(t, err) - require_Contains(t, err.Error(), "no suitable peers for placement", "3 peers", - "excludeTag: 1", "offline: 0", "uniqueTag: 0") + require_Contains(t, err.Error(), "no suitable peers for placement", "exclude tag set") // Test tag reload removing !jetstream tag, and allowing placement again srv := c.serverByName("S-1") @@ -9579,7 +9577,7 @@ func TestJetStreamClusterBalancedPlacement(t *testing.T) { Replicas: 2, MaxBytes: 1 * 1024 * 1024 * 1024, }) - require_Contains(t, err.Error(), "no suitable peers for placement") + require_Contains(t, err.Error(), "no suitable peers for placement", "insufficient storage") } func TestJetStreamClusterConsumerPendingBug(t *testing.T) { diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index cd8cb5eb..b6067308 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -14,7 +14,7 @@ const ( // JSClusterIncompleteErr incomplete results JSClusterIncompleteErr ErrorIdentifier = 10004 - // JSClusterNoPeersErrF no suitable peers for placement: {err} + // JSClusterNoPeersErrF Error causing no peers to be available ({err}) JSClusterNoPeersErrF ErrorIdentifier = 10005 // JSClusterNotActiveErr JetStream not in clustered mode @@ -401,7 +401,7 @@ var ( JSAccountResourcesExceededErr: {Code: 400, ErrCode: 10002, Description: "resource limits exceeded for account"}, JSBadRequestErr: {Code: 400, ErrCode: 10003, Description: "bad request"}, JSClusterIncompleteErr: {Code: 503, ErrCode: 10004, Description: "incomplete results"}, - JSClusterNoPeersErrF: {Code: 400, ErrCode: 10005, Description: "no suitable peers for placement: {err}"}, + JSClusterNoPeersErrF: {Code: 400, ErrCode: 10005, Description: "{err}"}, JSClusterNotActiveErr: {Code: 500, ErrCode: 10006, Description: "JetStream not in clustered mode"}, JSClusterNotAssignedErr: {Code: 500, ErrCode: 10007, Description: "JetStream cluster not assigned to this server"}, JSClusterNotAvailErr: {Code: 503, ErrCode: 10008, Description: "JetStream system temporarily unavailable"}, @@ -583,7 +583,7 @@ func NewJSClusterIncompleteError(opts ...ErrorOption) *ApiError { return ApiErrors[JSClusterIncompleteErr] } -// NewJSClusterNoPeersError creates a new JSClusterNoPeersErrF error: "no suitable peers for placement: {err}" +// NewJSClusterNoPeersError creates a new JSClusterNoPeersErrF error: "{err}" func NewJSClusterNoPeersError(err error, opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) if ae, ok := eopts.err.(*ApiError); ok { diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index d56a08d1..e31ce16d 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -79,7 +79,7 @@ func TestJetStreamSuperClusterMetaPlacement(t *testing.T) { // Make sure we get correct errors for tags and bad or unavailable cluster placement. sdr := stepdown("C22") - if sdr.Error == nil || !strings.Contains(sdr.Error.Description, "no suitable peers") { + if sdr.Error == nil || !strings.Contains(sdr.Error.Description, "no replacement peer connected") { t.Fatalf("Got incorrect error result: %+v", sdr.Error) } // Should work. @@ -184,7 +184,7 @@ func TestJetStreamSuperClusterUniquePlacementTag(t *testing.T) { si, err := js.AddStream(&nats.StreamConfig{Name: name, Replicas: test.replicas, Placement: test.placement}) if test.fail { require_Error(t, err) - require_Contains(t, err.Error(), "no suitable peers for placement") + require_Contains(t, err.Error(), "no suitable peers for placement", "server tag not unique") return } require_NoError(t, err) @@ -1503,7 +1503,8 @@ func TestJetStreamSuperClusterStreamTagPlacement(t *testing.T) { Subjects: []string{"foo"}, Placement: &nats.Placement{Tags: tags}, }) - require_Contains(t, err.Error(), "no suitable peers for placement") + require_Contains(t, err.Error(), "no suitable peers for placement", "tags not matched") + require_Contains(t, err.Error(), tags...) } placeErr("C1", []string{"cloud:GCP", "country:US"}) @@ -2393,7 +2394,7 @@ func TestJetStreamSuperClusterMaxHaAssets(t *testing.T) { _, err = js.AddStream(&nats.StreamConfig{Name: "S3", Replicas: 3, Placement: &nats.Placement{Cluster: "C1"}}) require_Error(t, err) require_Contains(t, err.Error(), "nats: no suitable peers for placement") - require_Contains(t, err.Error(), "misc: 3") + require_Contains(t, err.Error(), "miscellaneous issue") require_NoError(t, js.DeleteStream("S1")) waitStatsz(3, 2) waitStatsz(3, 1) @@ -2427,7 +2428,7 @@ func TestJetStreamSuperClusterMaxHaAssets(t *testing.T) { _, err = js.UpdateStream(&nats.StreamConfig{Name: "S2", Replicas: 3, Placement: &nats.Placement{Cluster: "C2"}}) require_Error(t, err) require_Contains(t, err.Error(), "nats: no suitable peers for placement") - require_Contains(t, err.Error(), "misc: 3") + require_Contains(t, err.Error(), "miscellaneous issue") } func TestJetStreamSuperClusterStreamAlternates(t *testing.T) {