Merge pull request #3678 from nats-io/ut-replacepeer

tag policies not honored in reassignment after peer remove
This commit is contained in:
Derek Collison
2022-12-06 17:50:11 -08:00
committed by GitHub
3 changed files with 317 additions and 67 deletions

View File

@@ -2406,7 +2406,7 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _
cfg.Placement.Tags = append(cfg.Placement.Tags, req.Tags...)
}
peers, e := cc.selectPeerGroup(cfg.Replicas+1, currCluster, &cfg, currPeers, 1)
peers, e := cc.selectPeerGroup(cfg.Replicas+1, currCluster, &cfg, currPeers, 1, nil)
if len(peers) <= cfg.Replicas {
// since expanding in the same cluster did not yield a result, try in different cluster
peers = nil
@@ -2421,7 +2421,7 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _
errs := &selectPeerError{}
errs.accumulate(e)
for cluster := range clusters {
newPeers, e := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil, 0)
newPeers, e := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil, 0, nil)
if len(newPeers) >= cfg.Replicas {
peers = append([]string{}, currPeers...)
peers = append(peers, newPeers[:cfg.Replicas]...)

View File

@@ -2741,20 +2741,9 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool {
// Check if this is for us..
if isMember {
js.processClusterCreateStream(acc, sa)
} else {
// Check if we have a raft node running, meaning we are no longer part of the group but were.
js.mu.Lock()
if node := sa.Group.node; node != nil {
if node.Leader() {
node.UpdateKnownPeers(sa.Group.Peers)
node.StepDown()
}
node.ProposeRemovePeer(ourID)
didRemove = true
}
sa.Group.node = nil
sa.err = nil
js.mu.Unlock()
} else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil {
// We have one here even though we are not a member. This can happen on re-assignment.
s.removeStream(ourID, mset, sa)
}
// If this stream assignment does not have a sync subject (bug) set that the meta-leader should check when elected.
@@ -2838,21 +2827,31 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) {
js.processClusterUpdateStream(acc, osa, sa)
} else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil {
// We have one here even though we are not a member. This can happen on re-assignment.
s.Debugf("JetStream removing stream '%s > %s' from this server", sa.Client.serviceAccount(), sa.Config.Name)
if node := mset.raftNode(); node != nil {
if node.Leader() {
node.StepDown(sa.Group.Preferred)
}
node.ProposeRemovePeer(ourID)
// shut down monitor by shutting down raft
node.Delete()
}
// wait for monitor to be shut down
mset.monitorWg.Wait()
mset.stop(true, false)
s.removeStream(ourID, mset, sa)
}
}
// Common function to remove ourself from this server.
// This can happen on re-assignment, move, etc
func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment) {
if mset == nil {
return
}
// Make sure to use the new stream assignment, not our own.
s.Debugf("JetStream removing stream '%s > %s' from this server", nsa.Client.serviceAccount(), nsa.Config.Name)
if node := mset.raftNode(); node != nil {
if node.Leader() {
node.StepDown(nsa.Group.Preferred)
}
node.ProposeRemovePeer(ourID)
// shut down monitor by shutting down raft
node.Delete()
}
// wait for monitor to be shut down
mset.monitorWg.Wait()
mset.stop(true, false)
}
// processClusterUpdateStream is called when we have a stream assignment that
// has been updated for an existing assignment and we are a member.
func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAssignment) {
@@ -4620,41 +4619,25 @@ func (js *jetStream) processLeaderChange(isLeader bool) {
// Lock should be held.
func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePeer string) bool {
// Need to select a replacement peer
s, now, cluster := cc.s, time.Now(), sa.Client.Cluster
if sa.Config.Placement != nil && sa.Config.Placement.Cluster != _EMPTY_ {
cluster = sa.Config.Placement.Cluster
// Invoke placement algo passing RG peers that stay (existing) and the peer that is being removed (ignore)
var retain, ignore []string
for _, v := range sa.Group.Peers {
if v == removePeer {
ignore = append(ignore, v)
} else {
retain = append(retain, v)
}
}
ourID := cc.meta.ID()
for _, p := range cc.meta.Peers() {
// If it is not in our list it's probably shutdown, so don't consider.
if si, ok := s.nodeToInfo.Load(p.ID); !ok || si.(nodeInfo).offline {
continue
}
// Make sure they are active and current and not already part of our group.
current, lastSeen := p.Current, now.Sub(p.Last)
// We do not track activity of ourselves so ignore.
if p.ID == ourID {
lastSeen = 0
}
if !current || lastSeen > lostQuorumInterval || sa.Group.isMember(p.ID) {
continue
}
// Make sure the correct cluster.
if s.clusterNameForNode(p.ID) != cluster {
continue
}
// If we are here we have our candidate replacement, swap out the old one.
for i, peer := range sa.Group.Peers {
if peer == removePeer {
sa.Group.Peers[i] = p.ID
// Don't influence preferred leader.
sa.Group.Preferred = _EMPTY_
return true
}
}
newPeers, placementError := cc.selectPeerGroup(len(sa.Group.Peers), sa.Group.Cluster, sa.Config, retain, 0, ignore)
if placementError == nil {
sa.Group.Peers = newPeers
// Don't influence preferred leader.
sa.Group.Preferred = _EMPTY_
return true
}
// If we are here let's remove the peer at least.
for i, peer := range sa.Group.Peers {
if peer == removePeer {
@@ -4738,7 +4721,7 @@ func (e *selectPeerError) accumulate(eAdd *selectPeerError) {
// selectPeerGroup will select a group of peers to start a raft group.
// when peers exist already the unique tag prefix check for the replaceFirstExisting will be skipped
func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig, existing []string, replaceFirstExisting int) ([]string, *selectPeerError) {
func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig, existing []string, replaceFirstExisting int, ignore []string) ([]string, *selectPeerError) {
if cluster == _EMPTY_ || cfg == nil {
return nil, &selectPeerError{misc: true}
}
@@ -4814,6 +4797,15 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
}
}
// Map ignore
var ip map[string]struct{}
if li := len(ignore); li > 0 {
ip = make(map[string]struct{})
for _, p := range ignore {
ip[p] = struct{}{}
}
}
maxHaAssets := s.getOpts().JetStreamLimits.MaxHAAssets
// An error is a result of multiple individual placement decisions.
@@ -4842,11 +4834,14 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
continue
}
// If ignore skip
if _, ok := ip[p.ID]; ok {
continue
}
// If existing also skip, we will add back in to front of the list when done.
if ep != nil {
if _, ok := ep[p.ID]; ok {
continue
}
if _, ok := ep[p.ID]; ok {
continue
}
if ni.tags.Contains(jsExcludePlacement) {
@@ -5021,7 +5016,7 @@ func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) (*r
// Need to create a group here.
errs := &selectPeerError{}
for _, cn := range clusters {
peers, err := cc.selectPeerGroup(replicas, cn, cfg, nil, 0)
peers, err := cc.selectPeerGroup(replicas, cn, cfg, nil, 0, nil)
if len(peers) < replicas {
errs.accumulate(err)
continue
@@ -5387,7 +5382,7 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
if isReplicaChange {
// We are adding new peers here.
if newCfg.Replicas > len(rg.Peers) {
peers, err := cc.selectPeerGroup(newCfg.Replicas, rg.Cluster, newCfg, rg.Peers, 0)
peers, err := cc.selectPeerGroup(newCfg.Replicas, rg.Cluster, newCfg, rg.Peers, 0, nil)
if err != nil {
resp.Error = NewJSClusterNoPeersError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))

View File

@@ -1703,3 +1703,258 @@ func TestJetStreamGhostEphemeralsAfterRestart(t *testing.T) {
return nil
})
}
func TestJetStreamClusterReplacementPolicyAfterPeerRemove(t *testing.T) {
// R3 scenario where there is a redundant node in each unique cloud so removing a peer should result in
// an immediate replacement also preserving cloud uniqueness.
sc := createJetStreamClusterExplicit(t, "PR9", 9)
sc.waitOnPeerCount(9)
reset := func(s *Server) {
s.mu.Lock()
rch := s.sys.resetCh
s.mu.Unlock()
if rch != nil {
rch <- struct{}{}
}
s.sendStatszUpdate()
}
tags := []string{"cloud:aws", "cloud:aws", "cloud:aws", "cloud:gcp", "cloud:gcp", "cloud:gcp", "cloud:az", "cloud:az", "cloud:az"}
var serverUTags = make(map[string]string)
for i, s := range sc.servers {
s.optsMu.Lock()
serverUTags[s.Name()] = tags[i]
s.opts.Tags.Add(tags[i])
s.opts.JetStreamUniqueTag = "cloud"
s.optsMu.Unlock()
reset(s)
}
ml := sc.leader()
js := ml.getJetStream()
require_True(t, js != nil)
js.mu.RLock()
cc := js.cluster
require_True(t, cc != nil)
// Walk and make sure all tags are registered.
expires := time.Now().Add(10 * time.Second)
for time.Now().Before(expires) {
allOK := true
for _, p := range cc.meta.Peers() {
si, ok := ml.nodeToInfo.Load(p.ID)
require_True(t, ok)
ni := si.(nodeInfo)
if len(ni.tags) == 0 {
allOK = false
reset(sc.serverByName(ni.name))
}
}
if allOK {
break
}
}
js.mu.RUnlock()
defer sc.shutdown()
sc.waitOnClusterReadyWithNumPeers(9)
s := sc.leader()
nc, jsc := jsClientConnect(t, s)
defer nc.Close()
_, err := jsc.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
sc.waitOnStreamLeader(globalAccountName, "TEST")
osi, err := jsc.StreamInfo("TEST")
require_NoError(t, err)
// Double check original placement honors unique_tag
var uTags = make(map[string]struct{})
uTags[serverUTags[osi.Cluster.Leader]] = struct{}{}
for _, replica := range osi.Cluster.Replicas {
evalTag := serverUTags[replica.Name]
if _, exists := uTags[evalTag]; !exists {
uTags[evalTag] = struct{}{}
continue
} else {
t.Fatalf("expected initial placement to honor unique_tag")
}
}
// Remove a peer and select replacement 5 times to avoid false good
for i := 0; i < 5; i++ {
// Remove 1 peer replica (this will be random cloud region as initial placement was randomized ordering)
// After each successful iteration, osi will reflect the current RG peers
toRemove := osi.Cluster.Replicas[0].Name
resp, err := nc.Request(fmt.Sprintf(JSApiStreamRemovePeerT, "TEST"), []byte(`{"peer":"`+toRemove+`"}`), time.Second)
require_NoError(t, err)
var rpResp JSApiStreamRemovePeerResponse
err = json.Unmarshal(resp.Data, &rpResp)
require_NoError(t, err)
require_True(t, rpResp.Success)
sc.waitOnStreamLeader(globalAccountName, "TEST")
checkFor(t, time.Second, 200*time.Millisecond, func() error {
osi, err = jsc.StreamInfo("TEST")
require_NoError(t, err)
if len(osi.Cluster.Replicas) != 2 {
return fmt.Errorf("expected R3, got R%d", len(osi.Cluster.Replicas)+1)
}
// STREAM.PEER.REMOVE is asynchronous command; make sure remove has occurred by
// checking that the toRemove peer is gone.
for _, replica := range osi.Cluster.Replicas {
if replica.Name == toRemove {
return fmt.Errorf("expected replaced replica, old replica still present")
}
}
return nil
})
// Validate that replacement with new peer still honors
uTags = make(map[string]struct{}) //reset
uTags[serverUTags[osi.Cluster.Leader]] = struct{}{}
for _, replica := range osi.Cluster.Replicas {
evalTag := serverUTags[replica.Name]
if _, exists := uTags[evalTag]; !exists {
uTags[evalTag] = struct{}{}
continue
} else {
t.Fatalf("expected new peer and revised placement to honor unique_tag")
}
}
}
}
func TestJetStreamClusterReplacementPolicyAfterPeerRemoveNoPlace(t *testing.T) {
// R3 scenario where there are exactly three unique cloud nodes, so removing a peer should NOT
// result in a new peer
sc := createJetStreamClusterExplicit(t, "threeup", 3)
sc.waitOnPeerCount(3)
reset := func(s *Server) {
s.mu.Lock()
rch := s.sys.resetCh
s.mu.Unlock()
if rch != nil {
rch <- struct{}{}
}
s.sendStatszUpdate()
}
tags := []string{"cloud:aws", "cloud:gcp", "cloud:az"}
var serverUTags = make(map[string]string)
for i, s := range sc.servers {
s.optsMu.Lock()
serverUTags[s.Name()] = tags[i]
s.opts.Tags.Add(tags[i])
s.opts.JetStreamUniqueTag = "cloud"
s.optsMu.Unlock()
reset(s)
}
ml := sc.leader()
js := ml.getJetStream()
require_True(t, js != nil)
js.mu.RLock()
cc := js.cluster
require_True(t, cc != nil)
// Walk and make sure all tags are registered.
expires := time.Now().Add(10 * time.Second)
for time.Now().Before(expires) {
allOK := true
for _, p := range cc.meta.Peers() {
si, ok := ml.nodeToInfo.Load(p.ID)
require_True(t, ok)
ni := si.(nodeInfo)
if len(ni.tags) == 0 {
allOK = false
reset(sc.serverByName(ni.name))
}
}
if allOK {
break
}
}
js.mu.RUnlock()
defer sc.shutdown()
sc.waitOnClusterReadyWithNumPeers(3)
s := sc.leader()
nc, jsc := jsClientConnect(t, s)
defer nc.Close()
_, err := jsc.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
sc.waitOnStreamLeader(globalAccountName, "TEST")
osi, err := jsc.StreamInfo("TEST")
require_NoError(t, err)
// Double check original placement honors unique_tag
var uTags = make(map[string]struct{})
uTags[serverUTags[osi.Cluster.Leader]] = struct{}{}
for _, replica := range osi.Cluster.Replicas {
evalTag := serverUTags[replica.Name]
if _, exists := uTags[evalTag]; !exists {
uTags[evalTag] = struct{}{}
continue
} else {
t.Fatalf("expected initial placement to honor unique_tag")
}
}
// Remove 1 peer replica (this will be random cloud region as initial placement was randomized ordering)
_, err = nc.Request("$JS.API.STREAM.PEER.REMOVE.TEST", []byte(`{"peer":"`+osi.Cluster.Replicas[0].Name+`"}`), time.Second*10)
require_NoError(t, err)
sc.waitOnStreamLeader(globalAccountName, "TEST")
// Verify R2 since no eligible peer can replace the removed peer without braking unique constraint
checkFor(t, time.Second, 200*time.Millisecond, func() error {
osi, err = jsc.StreamInfo("TEST")
require_NoError(t, err)
if len(osi.Cluster.Replicas) != 1 {
return fmt.Errorf("expected R2, got R%d", len(osi.Cluster.Replicas)+1)
}
return nil
})
// Validate that remaining members still honor unique tags
uTags = make(map[string]struct{}) //reset
uTags[serverUTags[osi.Cluster.Leader]] = struct{}{}
for _, replica := range osi.Cluster.Replicas {
evalTag := serverUTags[replica.Name]
if _, exists := uTags[evalTag]; !exists {
uTags[evalTag] = struct{}{}
continue
} else {
t.Fatalf("expected revised placement to honor unique_tag")
}
}
}