Merge pull request #3691 from nats-io/ut-test-fix-2

Make stream removal from a server consistent.
This commit is contained in:
Todd Beets
2022-12-06 17:39:23 -08:00
committed by GitHub
2 changed files with 32 additions and 29 deletions

View File

@@ -2741,20 +2741,9 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool {
// Check if this is for us..
if isMember {
js.processClusterCreateStream(acc, sa)
} else {
// Check if we have a raft node running, meaning we are no longer part of the group but were.
js.mu.Lock()
if node := sa.Group.node; node != nil {
if node.Leader() {
node.UpdateKnownPeers(sa.Group.Peers)
node.StepDown()
}
node.ProposeRemovePeer(ourID)
didRemove = true
}
sa.Group.node = nil
sa.err = nil
js.mu.Unlock()
} else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil {
// We have one here even though we are not a member. This can happen on re-assignment.
s.removeStream(ourID, mset, sa)
}
// If this stream assignment does not have a sync subject (bug) set that the meta-leader should check when elected.
@@ -2838,21 +2827,31 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) {
js.processClusterUpdateStream(acc, osa, sa)
} else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil {
// We have one here even though we are not a member. This can happen on re-assignment.
s.Debugf("JetStream removing stream '%s > %s' from this server", sa.Client.serviceAccount(), sa.Config.Name)
if node := mset.raftNode(); node != nil {
if node.Leader() {
node.StepDown(sa.Group.Preferred)
}
node.ProposeRemovePeer(ourID)
// shut down monitor by shutting down raft
node.Delete()
}
// wait for monitor to be shut down
mset.monitorWg.Wait()
mset.stop(true, false)
s.removeStream(ourID, mset, sa)
}
}
// Common function to remove ourself from this server.
// This can happen on re-assignment, move, etc
func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment) {
if mset == nil {
return
}
// Make sure to use the new stream assignment, not our own.
s.Debugf("JetStream removing stream '%s > %s' from this server", nsa.Client.serviceAccount(), nsa.Config.Name)
if node := mset.raftNode(); node != nil {
if node.Leader() {
node.StepDown(nsa.Group.Preferred)
}
node.ProposeRemovePeer(ourID)
// shut down monitor by shutting down raft
node.Delete()
}
// wait for monitor to be shut down
mset.monitorWg.Wait()
mset.stop(true, false)
}
// processClusterUpdateStream is called when we have a stream assignment that
// has been updated for an existing assignment and we are a member.
func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAssignment) {

View File

@@ -1731,10 +1731,13 @@ func TestJetStreamClusterReplacementPolicyAfterPeerRemove(t *testing.T) {
for i := 0; i < 5; i++ {
// Remove 1 peer replica (this will be random cloud region as initial placement was randomized ordering)
// After each successful iteration, osi will reflect the current RG peers
toRemove := osi.Cluster.Replicas[0].Name
_, err = nc.Request("$JS.API.STREAM.PEER.REMOVE.TEST", []byte(`{"peer":"`+toRemove+`"}`), time.Second*10)
resp, err := nc.Request(fmt.Sprintf(JSApiStreamRemovePeerT, "TEST"), []byte(`{"peer":"`+toRemove+`"}`), time.Second)
require_NoError(t, err)
var rpResp JSApiStreamRemovePeerResponse
err = json.Unmarshal(resp.Data, &rpResp)
require_NoError(t, err)
require_True(t, rpResp.Success)
sc.waitOnStreamLeader(globalAccountName, "TEST")
@@ -1744,7 +1747,8 @@ func TestJetStreamClusterReplacementPolicyAfterPeerRemove(t *testing.T) {
if len(osi.Cluster.Replicas) != 2 {
return fmt.Errorf("expected R3, got R%d", len(osi.Cluster.Replicas)+1)
}
// STREAM.PEER.REMOVE is asynchronous command; make sure remove has occurred by retrying
// STREAM.PEER.REMOVE is asynchronous command; make sure remove has occurred by
// checking that the toRemove peer is gone.
for _, replica := range osi.Cluster.Replicas {
if replica.Name == toRemove {
return fmt.Errorf("expected replaced replica, old replica still present")