diff --git a/server/errors.json b/server/errors.json index 530f6e70..5730766e 100644 --- a/server/errors.json +++ b/server/errors.json @@ -80,10 +80,10 @@ "deprecates": "" }, { - "constant": "JSClusterNoPeersErr", + "constant": "JSClusterNoPeersErrF", "code": 400, "error_code": 10005, - "description": "no suitable peers for placement", + "description": "no suitable peers for placement: {err}", "comment": "", "help": "", "url": "", diff --git a/server/jetstream_api.go b/server/jetstream_api.go index c8f0921c..80ae12a5 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2357,7 +2357,7 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ cfg.Placement.Tags = append(cfg.Placement.Tags, req.Tags...) } - peers := cc.selectPeerGroup(cfg.Replicas+1, currCluster, &cfg, currPeers, 1) + peers, e := cc.selectPeerGroup(cfg.Replicas+1, currCluster, &cfg, currPeers, 1) if len(peers) <= cfg.Replicas { // since expanding in the same cluster did not yield a result, try in different cluster peers = nil @@ -2369,16 +2369,18 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ } return true }) + errs := selectPeerErrors{e} for cluster := range clusters { - newPeers := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil, 0) + newPeers, _ := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil, 0) if len(newPeers) >= cfg.Replicas { peers = append([]string{}, currPeers...) peers = append(peers, newPeers[:cfg.Replicas]...) break } + errs = append(errs, e) } if peers == nil { - resp.Error = NewJSClusterNoPeersError() + resp.Error = NewJSClusterNoPeersError(&errs) s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } @@ -2619,7 +2621,7 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun } } if len(peers) == 0 { - resp.Error = NewJSClusterNoPeersError() + resp.Error = NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected")) s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index bc5d23f0..a47dc1ac 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4245,7 +4245,9 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client if cfg.Placement == nil || cfg.Placement.Cluster == _EMPTY_ { // If we have additional clusters to try we can retry. if ci != nil && len(ci.Alternates) > 0 { - if rg := js.createGroupForStream(ci, cfg); rg != nil { + if rg, err := js.createGroupForStream(ci, cfg); err != nil { + s.Warnf("Retrying cluster placement for stream '%s > %s' failed due to placement error: %+v", result.Account, result.Stream, err) + } else { if org := sa.Group; org != nil && len(org.Peers) > 0 { s.Warnf("Retrying cluster placement for stream '%s > %s' due to insufficient resources in cluster %q", result.Account, result.Stream, s.clusterNameForNode(org.Peers[0])) @@ -4476,11 +4478,44 @@ func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePe return false } +type selectPeerError struct { + cluster string + clusterPeers int + offline int + excludeTag int + noTagMatch int + noStorage int + uniqueTag int + misc int +} + +func (e *selectPeerError) Error() string { + return fmt.Sprintf(`peer selection cluster '%s' with %d peers +offline: %d +excludeTag: %d +noTagMatch: %d +noStorage: %d +uniqueTag: %d +misc: %d +`, + e.cluster, e.clusterPeers, e.offline, e.excludeTag, e.noTagMatch, e.noStorage, e.uniqueTag, e.misc) +} + +type selectPeerErrors []*selectPeerError + +func (e *selectPeerErrors) Error() string { + errors := make([]string, len(*e)) + for i, err := range *e { + errors[i] = err.Error() + } + return strings.Join(errors, "\n") +} + // selectPeerGroup will select a group of peers to start a raft group. // when peers exist already the unique tag prefix check for the replaceFirstExisting will be skipped -func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig, existing []string, replaceFirstExisting int) (re []string) { +func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig, existing []string, replaceFirstExisting int) ([]string, *selectPeerError) { if cluster == _EMPTY_ || cfg == nil { - return nil + return nil, &selectPeerError{cluster: cluster, misc: 1} } var maxBytes uint64 @@ -4515,28 +4550,28 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo } } } - var uniqueTags = make(map[string]struct{}) + var uniqueTags = make(map[string]*nodeInfo) - checkUniqueTag := func(ni *nodeInfo) bool { - // default requires the unique prefix to be present - isUnique := false + checkUniqueTag := func(ni *nodeInfo) (bool, *nodeInfo) { for _, t := range ni.tags { if strings.HasPrefix(t, uniqueTagPrefix) { - if _, ok := uniqueTags[t]; !ok { - uniqueTags[t] = struct{}{} - isUnique = true + if n, ok := uniqueTags[t]; !ok { + uniqueTags[t] = ni + return true, ni + } else { + return false, n } - break } } - return isUnique + // default requires the unique prefix to be present + return false, nil } // Map existing. var ep map[string]struct{} if le := len(existing); le > 0 { if le >= r { - return existing + return existing[:r], nil } ep = make(map[string]struct{}) for i, p := range existing { @@ -4556,17 +4591,30 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo maxHaAssets := s.getOpts().JetStreamLimits.MaxHAAssets + // An error is a result of multiple individual placement decisions. + // Which is why we keep taps on how often which one happened. + err := selectPeerError{cluster: cluster} + // Shuffle them up. rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) for _, p := range peers { si, ok := s.nodeToInfo.Load(p.ID) if !ok || si == nil { + err.misc++ continue } ni := si.(nodeInfo) // Only select from the designated named cluster. - // If we know its offline or we do not have config or stats don't consider. - if ni.cluster != cluster || ni.offline || ni.cfg == nil || ni.stats == nil { + if ni.cluster != cluster { + s.Debugf("Peer selection: discard %s@%s reason: not target cluster %s", ni.name, ni.cluster, cluster) + continue + } + err.clusterPeers++ + + // If we know its offline or we do not have config or err don't consider. + if ni.offline || ni.cfg == nil || ni.stats == nil { + s.Debugf("Peer selection: discard %s@%s reason: offline", ni.name, ni.cluster) + err.offline++ continue } @@ -4578,6 +4626,9 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo } if ni.tags.Contains(jsExcludePlacement) { + s.Debugf("Peer selection: discard %s@%s tags: %v reason: %s present", + ni.name, ni.cluster, ni.tags, jsExcludePlacement) + err.excludeTag++ continue } @@ -4586,10 +4637,13 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo for _, t := range tags { if !ni.tags.Contains(t) { matched = false + s.Debugf("Peer selection: discard %s@%s tags: %v reason: mandatory tag %s not present", + ni.name, ni.cluster, ni.tags, t) break } } if !matched { + err.noTagMatch++ continue } } @@ -4620,19 +4674,31 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo // Otherwise check if we have enough room if maxBytes set. if maxBytes > 0 && maxBytes > available { - s.Warnf("%s@%s (Max Bytes: %d) exceeds available %s storage of %d bytes", + s.Warnf("Peer selection: discard %s@%s (Max Bytes: %d) exceeds available %s storage of %d bytes", ni.name, ni.cluster, maxBytes, cfg.Storage.String(), available) + err.noStorage++ continue } // HAAssets contain _meta_ which we want to ignore if maxHaAssets > 0 && ni.stats != nil && ni.stats.HAAssets > maxHaAssets { - s.Warnf("%s@%s (HA Asset Count: %d) exceeds max ha asset limit of %d for stream placement", + s.Warnf("Peer selection: discard %s@%s (HA Asset Count: %d) exceeds max ha asset limit of %d for stream placement", ni.name, ni.cluster, ni.stats.HAAssets, maxHaAssets) + err.misc++ continue } - if uniqueTagPrefix != _EMPTY_ && !checkUniqueTag(&ni) { - continue + if uniqueTagPrefix != _EMPTY_ { + if unique, owner := checkUniqueTag(&ni); !unique { + if owner != nil { + s.Debugf("Peer selection: discard %s@%s tags:%v reason: unique prefix %s owned by %s@%s", + ni.name, ni.cluster, ni.tags, owner.name, owner.cluster) + } else { + s.Debugf("Peer selection: discard %s@%s tags:%v reason: unique prefix %s not present", + ni.name, ni.cluster, ni.tags) + } + err.uniqueTag++ + continue + } } // Add to our list of potential nodes. nodes = append(nodes, wn{p.ID, available, ha}) @@ -4640,7 +4706,9 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo // If we could not select enough peers, fail. if len(nodes) < (r - len(existing)) { - return nil + s.Debugf("Peer selection: required %d nodes but found %d (cluster: %s replica: %d existing: %v/%d peers: %d result-peers: %d err: %+v)", + (r - len(existing)), len(nodes), cluster, r, existing, replaceFirstExisting, len(peers), len(nodes), err) + return nil, &err } // Sort based on available from most to least. sort.Slice(nodes, func(i, j int) bool { return nodes[i].avail > nodes[j].avail }) @@ -4658,7 +4726,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo for _, r := range nodes[:r] { results = append(results, r.id) } - return results + return results, nil } func groupNameForStream(peers []string, storage StorageType) string { @@ -4705,7 +4773,7 @@ func tieredStreamAndReservationCount(asa map[string]*streamAssignment, tier stri // createGroupForStream will create a group for assignment for the stream. // Lock should be held. -func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) *raftGroup { +func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) (*raftGroup, *selectPeerErrors) { replicas := cfg.Replicas if replicas == 0 { replicas = 1 @@ -4724,14 +4792,16 @@ func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) *ra } // Need to create a group here. + errFirst := selectPeerErrors{} for _, cn := range clusters { - peers := cc.selectPeerGroup(replicas, cn, cfg, nil, 0) + peers, err := cc.selectPeerGroup(replicas, cn, cfg, nil, 0) if len(peers) < replicas { + errFirst = append(errFirst, err) continue } - return &raftGroup{Name: groupNameForStream(peers, cfg.Storage), Storage: cfg.Storage, Peers: peers, Cluster: cn} + return &raftGroup{Name: groupNameForStream(peers, cfg.Storage), Storage: cfg.Storage, Peers: peers, Cluster: cn}, nil } - return nil + return nil, &errFirst } func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, string, *jsAccount, *ApiError) { @@ -4847,9 +4917,9 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, } // Raft group selection and placement. - rg := js.createGroupForStream(ci, cfg) - if rg == nil { - resp.Error = NewJSInsufficientResourcesError() + rg, err := js.createGroupForStream(ci, cfg) + if err != nil { + resp.Error = NewJSClusterNoPeersError(err) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } @@ -5016,9 +5086,9 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su if isReplicaChange { // We are adding new peers here. if newCfg.Replicas > len(rg.Peers) { - peers := cc.selectPeerGroup(newCfg.Replicas, rg.Cluster, newCfg, rg.Peers, 0) - if len(peers) != newCfg.Replicas { - resp.Error = NewJSInsufficientResourcesError() + peers, err := cc.selectPeerGroup(newCfg.Replicas, rg.Cluster, newCfg, rg.Peers, 0) + if err != nil { + resp.Error = NewJSClusterNoPeersError(err) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } @@ -5089,9 +5159,9 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su } } else if isMoveRequest { if len(peerSet) == 0 { - nrg := js.createGroupForStream(ci, newCfg) - if nrg == nil { - resp.Error = NewJSInsufficientResourcesError() + nrg, err := js.createGroupForStream(ci, newCfg) + if err != nil { + resp.Error = NewJSClusterNoPeersError(err) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } @@ -5275,9 +5345,9 @@ func (s *Server) jsClusteredStreamRestoreRequest( } // Raft group selection and placement. - rg := js.createGroupForStream(ci, cfg) - if rg == nil { - resp.Error = NewJSInsufficientResourcesError() + rg, err := js.createGroupForStream(ci, cfg) + if err != nil { + resp.Error = NewJSClusterNoPeersError(err) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 5f91d74b..b7e60f87 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -148,7 +148,7 @@ func TestJetStreamClusterStreamLimitWithAccountDefaults(t *testing.T) { Replicas: 2, MaxBytes: 15 * 1024 * 1024, }) - require_Error(t, err, NewJSInsufficientResourcesError(), NewJSStorageResourcesExceededError()) + require_Contains(t, err.Error(), "no suitable peers for placement") } func TestJetStreamClusterSingleReplicaStreams(t *testing.T) { @@ -1047,7 +1047,7 @@ func TestJetStreamClusterMaxBytesForStream(t *testing.T) { cfg.Name = "TEST2" cfg.MaxBytes *= 2 _, err = js.AddStream(cfg) - require_Error(t, err, NewJSInsufficientResourcesError(), NewJSStorageResourcesExceededError()) + require_Contains(t, err.Error(), "no suitable peers for placement") } func TestJetStreamClusterStreamPublishWithActiveConsumers(t *testing.T) { @@ -3634,7 +3634,8 @@ func TestJetStreamClusterPeerExclusionTag(t *testing.T) { c.Subjects = []string{c.Name} _, err := js.AddStream(&c) require_Error(t, err) - require_Contains(t, err.Error(), "insufficient resources") + require_Contains(t, err.Error(), "no suitable peers for placement", "3 peers", + "excludeTag: 1", "offline: 0", "uniqueTag: 0") } // Test update failure @@ -3645,8 +3646,8 @@ func TestJetStreamClusterPeerExclusionTag(t *testing.T) { cfg.Replicas = 3 _, err = js.UpdateStream(cfg) require_Error(t, err) - require_Contains(t, err.Error(), "insufficient resources") - + require_Contains(t, err.Error(), "no suitable peers for placement", "3 peers", + "excludeTag: 1", "offline: 0", "uniqueTag: 0") // Test tag reload removing !jetstream tag, and allowing placement again srv := c.serverByName("S-1") @@ -9398,7 +9399,7 @@ func TestJetStreamClusterBalancedPlacement(t *testing.T) { Replicas: 2, MaxBytes: 1 * 1024 * 1024 * 1024, }) - require_Error(t, err, NewJSInsufficientResourcesError(), NewJSStorageResourcesExceededError()) + require_Contains(t, err.Error(), "no suitable peers for placement") } func TestJetStreamClusterConsumerPendingBug(t *testing.T) { diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index c5de171f..ce9f62ac 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -14,8 +14,8 @@ const ( // JSClusterIncompleteErr incomplete results JSClusterIncompleteErr ErrorIdentifier = 10004 - // JSClusterNoPeersErr no suitable peers for placement - JSClusterNoPeersErr ErrorIdentifier = 10005 + // JSClusterNoPeersErrF no suitable peers for placement: {err} + JSClusterNoPeersErrF ErrorIdentifier = 10005 // JSClusterNotActiveErr JetStream not in clustered mode JSClusterNotActiveErr ErrorIdentifier = 10006 @@ -398,7 +398,7 @@ var ( JSAccountResourcesExceededErr: {Code: 400, ErrCode: 10002, Description: "resource limits exceeded for account"}, JSBadRequestErr: {Code: 400, ErrCode: 10003, Description: "bad request"}, JSClusterIncompleteErr: {Code: 503, ErrCode: 10004, Description: "incomplete results"}, - JSClusterNoPeersErr: {Code: 400, ErrCode: 10005, Description: "no suitable peers for placement"}, + JSClusterNoPeersErrF: {Code: 400, ErrCode: 10005, Description: "no suitable peers for placement: {err}"}, JSClusterNotActiveErr: {Code: 500, ErrCode: 10006, Description: "JetStream not in clustered mode"}, JSClusterNotAssignedErr: {Code: 500, ErrCode: 10007, Description: "JetStream cluster not assigned to this server"}, JSClusterNotAvailErr: {Code: 503, ErrCode: 10008, Description: "JetStream system temporarily unavailable"}, @@ -579,14 +579,20 @@ func NewJSClusterIncompleteError(opts ...ErrorOption) *ApiError { return ApiErrors[JSClusterIncompleteErr] } -// NewJSClusterNoPeersError creates a new JSClusterNoPeersErr error: "no suitable peers for placement" -func NewJSClusterNoPeersError(opts ...ErrorOption) *ApiError { +// NewJSClusterNoPeersError creates a new JSClusterNoPeersErrF error: "no suitable peers for placement: {err}" +func NewJSClusterNoPeersError(err error, opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) if ae, ok := eopts.err.(*ApiError); ok { return ae } - return ApiErrors[JSClusterNoPeersErr] + e := ApiErrors[JSClusterNoPeersErrF] + args := e.toReplacerArgs([]interface{}{"{err}", err}) + return &ApiError{ + Code: e.Code, + ErrCode: e.ErrCode, + Description: strings.NewReplacer(args...).Replace(e.Description), + } } // NewJSClusterNotActiveError creates a new JSClusterNotActiveErr error: "JetStream not in clustered mode" diff --git a/server/jetstream_leafnode_test.go b/server/jetstream_leafnode_test.go index 2cf40072..5343bbb9 100644 --- a/server/jetstream_leafnode_test.go +++ b/server/jetstream_leafnode_test.go @@ -484,7 +484,7 @@ leafnodes:{ require_NoError(t, err) } else { require_Error(t, err) - require_Contains(t, err.Error(), "insufficient resources") + require_Contains(t, err.Error(), "no suitable peers for placement") } ncL := natsConnect(t, fmt.Sprintf("nats://a1:a1@127.0.0.1:%d", sLA.opts.Port)) defer ncL.Close() @@ -499,7 +499,7 @@ leafnodes:{ require_NoError(t, err) } else { require_Error(t, err) - require_Contains(t, err.Error(), "insufficient resources") + require_Contains(t, err.Error(), "no suitable peers for placement") } } clusterLnCnt := func(expected int) error { diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 3b1cc402..a4612620 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -180,7 +180,7 @@ func TestJetStreamSuperClusterUniquePlacementTag(t *testing.T) { si, err := js.AddStream(&nats.StreamConfig{Name: name, Replicas: test.replicas, Placement: test.placement}) if test.fail { require_Error(t, err) - require_Equal(t, err.Error(), "insufficient resources") + require_Contains(t, err.Error(), "no suitable peers for placement") return } require_NoError(t, err) @@ -848,8 +848,8 @@ func TestJetStreamSuperClusterLeafNodesWithSharedSystemAccountAndDifferentDomain Replicas: 2, Placement: &nats.Placement{Cluster: pcn}, }) - if err == nil || !strings.Contains(err.Error(), "insufficient resources") { - t.Fatalf("Expected insufficient resources, got: %v", err) + if err == nil || !strings.Contains(err.Error(), "no suitable peers for placement") { + t.Fatalf("Expected no suitable peers for placement, got: %v", err) } } @@ -1380,8 +1380,7 @@ func TestJetStreamSuperClusterOverflowPlacement(t *testing.T) { MaxBytes: 2 * 1024 * 1024 * 1024, Placement: &nats.Placement{Cluster: pcn}, }) - require_Error(t, err, NewJSInsufficientResourcesError(), NewJSStorageResourcesExceededError()) - + require_Contains(t, err.Error(), "no suitable peers for placement") // Now test actual overflow placement. So try again with no placement designation. // This will test the peer picker's logic since they are updated at this point and the meta leader // knows it can not place it in C2. @@ -1497,7 +1496,7 @@ func TestJetStreamSuperClusterStreamTagPlacement(t *testing.T) { Subjects: []string{"foo"}, Placement: &nats.Placement{Tags: tags}, }) - require_Error(t, err, NewJSInsufficientResourcesError()) + require_Contains(t, err.Error(), "no suitable peers for placement") } placeErr("C1", []string{"cloud:GCP", "country:US"}) @@ -2381,7 +2380,8 @@ func TestJetStreamSuperClusterMaxHaAssets(t *testing.T) { waitStatsz(3, 1) _, err = js.AddStream(&nats.StreamConfig{Name: "S3", Replicas: 3, Placement: &nats.Placement{Cluster: "C1"}}) require_Error(t, err) - require_Equal(t, err.Error(), "insufficient resources") + require_Contains(t, err.Error(), "no suitable peers for placement") + require_Contains(t, err.Error(), "misc: 3") require_NoError(t, js.DeleteStream("S1")) waitStatsz(3, 2) waitStatsz(3, 1) @@ -2414,7 +2414,8 @@ func TestJetStreamSuperClusterMaxHaAssets(t *testing.T) { require_Equal(t, err.Error(), "insufficient resources") _, err = js.UpdateStream(&nats.StreamConfig{Name: "S2", Replicas: 3, Placement: &nats.Placement{Cluster: "C2"}}) require_Error(t, err) - require_Equal(t, err.Error(), "insufficient resources") + require_Contains(t, err.Error(), "no suitable peers for placement") + require_Contains(t, err.Error(), "misc: 3") } func TestJetStreamSuperClusterStreamAlternates(t *testing.T) { diff --git a/server/mqtt_test.go b/server/mqtt_test.go index b2a1d6bd..3ecedc0e 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -18,6 +18,7 @@ import ( "bytes" "crypto/tls" "encoding/json" + "errors" "fmt" "io" "math/rand" @@ -5753,8 +5754,8 @@ func TestMQTTStreamReplicasInsufficientResources(t *testing.T) { select { case e := <-l.errCh: - if !strings.Contains(e, NewJSInsufficientResourcesError().Description) { - t.Fatalf("Expected error regarding insufficient resources, got %v", e) + if !strings.Contains(e, fmt.Sprintf("%d", NewJSClusterNoPeersError(errors.New("")).ErrCode)) { + t.Fatalf("Expected error regarding no peers error, got %v", e) } case <-time.After(2 * time.Second): t.Fatalf("Did not get the error regarding replicas count")