Fix unique_tag issue with stream replica increase

When increasing the replica count unique tags for already existing peers
where ignored, which could lead to bad placement

Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
Matthias Hanel
2022-07-07 20:18:46 +02:00
parent 7ab256185e
commit f0ee56cf0a
2 changed files with 97 additions and 30 deletions

View File

@@ -4283,19 +4283,6 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
// peers is a randomized list
s, peers := cc.s, cc.meta.Peers()
// Map existing.
var ep map[string]struct{}
if le := len(existing); le > 0 {
if le >= r {
return existing
}
ep = make(map[string]struct{})
for _, p := range existing {
ep[p] = struct{}{}
//TODO preload unique tag prefix
}
}
uniqueTagPrefix := s.getOpts().JetStreamUniqueTag
if uniqueTagPrefix != _EMPTY_ {
for _, tag := range tags {
@@ -4307,6 +4294,44 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
}
}
var uniqueTags = make(map[string]struct{})
checkUniqueTag := func(ni *nodeInfo) bool {
// default requires the unique prefix to be present
isUnique := false
for _, t := range ni.tags {
if strings.HasPrefix(t, uniqueTagPrefix) {
if _, ok := uniqueTags[t]; !ok {
uniqueTags[t] = struct{}{}
isUnique = true
}
break
}
}
return isUnique
}
// Map existing.
var ep map[string]struct{}
if le := len(existing); le > 0 {
if le >= r {
return existing
}
ep = make(map[string]struct{})
for _, p := range existing {
ep[p] = struct{}{}
if uniqueTagPrefix == _EMPTY_ {
continue
}
si, ok := s.nodeToInfo.Load(p)
if !ok || si == nil {
continue
}
ni := si.(nodeInfo)
// collect unique tags, but do not require them as this node is already part of the peerset
checkUniqueTag(&ni)
}
}
maxHaAssets := s.getOpts().JetStreamLimits.MaxHAAssets
// Shuffle them up.
@@ -4384,21 +4409,8 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
continue
}
if uniqueTagPrefix != _EMPTY_ {
// default requires the unique prefix to be present
isUnique := false
for _, t := range ni.tags {
if strings.HasPrefix(t, uniqueTagPrefix) {
if _, ok := uniqueTags[t]; !ok {
uniqueTags[t] = struct{}{}
isUnique = true
}
break
}
}
if !isUnique {
continue
}
if uniqueTagPrefix != _EMPTY_ && !checkUniqueTag(&ni) {
continue
}
// Add to our list of potential nodes.
nodes = append(nodes, wn{p.ID, available, ha})

View File

@@ -124,6 +124,25 @@ func TestJetStreamSuperClusterUniquePlacementTag(t *testing.T) {
})
defer s.shutdown()
inDifferentAz := func(ci *nats.ClusterInfo) (bool, error) {
t.Helper()
if len(ci.Replicas) == 0 {
return true, nil
}
// if R2 (has replica, this setup does not support R3), test if the server in a cluster picked the same az,
// as determined by modulo2 of server number which aligns with az
dummy := 0
srvnum1 := 0
srvnum2 := 0
if n, _ := fmt.Sscanf(ci.Leader, "C%d-S%d", &dummy, &srvnum1); n != 2 {
return false, fmt.Errorf("couldn't parse leader")
}
if n, _ := fmt.Sscanf(ci.Replicas[0].Name, "C%d-S%d", &dummy, &srvnum2); n != 2 {
return false, fmt.Errorf("couldn't parse replica")
}
return srvnum1%2 != srvnum2%2, nil
}
nc := natsConnect(t, s.randomServer().ClientURL())
defer nc.Close()
@@ -157,7 +176,7 @@ func TestJetStreamSuperClusterUniquePlacementTag(t *testing.T) {
} {
name := fmt.Sprintf("test-%d", i)
t.Run(name, func(t *testing.T) {
ci, err := js.AddStream(&nats.StreamConfig{Name: name, Replicas: test.replicas, Placement: test.placement})
si, err := js.AddStream(&nats.StreamConfig{Name: name, Replicas: test.replicas, Placement: test.placement})
if test.fail {
require_Error(t, err)
require_Equal(t, err.Error(), "insufficient resources")
@@ -165,10 +184,46 @@ func TestJetStreamSuperClusterUniquePlacementTag(t *testing.T) {
}
require_NoError(t, err)
if test.cluster != _EMPTY_ {
require_Equal(t, ci.Cluster.Name, test.cluster)
require_Equal(t, si.Cluster.Name, test.cluster)
}
// skip placement test if tags call for a particular az
if test.placement != nil && len(test.placement.Tags) > 0 {
for _, tag := range test.placement.Tags {
if strings.HasPrefix(tag, "az:") {
return
}
}
}
diff, err := inDifferentAz(si.Cluster)
require_NoError(t, err)
require_True(t, diff)
})
}
t.Run("scale-up-test", func(t *testing.T) {
// create enough streams so we hit it eventually
for i := 0; i < 10; i++ {
cfg := &nats.StreamConfig{Name: fmt.Sprintf("scale-up-%d", i), Replicas: 1,
Placement: &nats.Placement{Tags: []string{"cloud:C2-tag"}}}
si, err := js.AddStream(cfg)
require_NoError(t, err)
require_Equal(t, si.Cluster.Name, "C2")
cfg.Replicas = 2
si, err = js.UpdateStream(cfg)
require_NoError(t, err)
require_Equal(t, si.Cluster.Name, "C2")
checkFor(t, 10, 250*time.Millisecond, func() error {
if si, err := js.StreamInfo(cfg.Name); err != nil {
return err
} else if diff, err := inDifferentAz(si.Cluster); err != nil {
return err
} else if !diff {
return fmt.Errorf("not in different AZ")
}
return nil
})
}
})
}
func TestJetStreamSuperClusterBasics(t *testing.T) {