Merge pull request #2493 from nats-io/remove-peer

During peer removal, try to remap any stream or consumer assets.
This commit is contained in:
Derek Collison
2021-09-07 07:22:22 -07:00
committed by GitHub
3 changed files with 332 additions and 14 deletions

View File

@@ -1012,10 +1012,60 @@ func (sa *streamAssignment) copyGroup() *streamAssignment {
return &csa
}
// Lock should be held.
func (sa *streamAssignment) missingPeers() bool {
return len(sa.Group.Peers) < sa.Config.Replicas
}
// Called when we detect a new peer. Only the leader will process checking
// for any streams, and consequently any consumers.
func (js *jetStream) processAddPeer(peer string) {
js.mu.Lock()
defer js.mu.Unlock()
s, cc := js.srv, js.cluster
isLeader := cc.isLeader()
// Now check if we are meta-leader. We will check for any re-assignments.
if !isLeader {
return
}
sir, ok := s.nodeToInfo.Load(peer)
if !ok || sir == nil {
return
}
si := sir.(nodeInfo)
for _, asa := range cc.streams {
for _, sa := range asa {
if sa.missingPeers() {
// Make sure the right cluster etc.
if si.cluster != sa.Client.Cluster {
continue
}
// If we are here we can add in this peer.
csa := sa.copyGroup()
csa.Group.Peers = append(csa.Group.Peers, peer)
// Send our proposal for this csa. Also use same group definition for all the consumers as well.
cc.meta.Propose(encodeAddStreamAssignment(csa))
for _, ca := range sa.consumers {
// Ephemerals are R=1, so only auto-remap durables, or R>1.
if ca.Config.Durable != _EMPTY_ {
cca := *ca
cca.Group.Peers = csa.Group.Peers
cc.meta.Propose(encodeAddConsumerAssignment(&cca))
}
}
}
}
}
}
func (js *jetStream) processRemovePeer(peer string) {
js.mu.Lock()
s, cc := js.srv, js.cluster
isLeader := cc.isLeader()
// All nodes will check if this is them.
isUs := cc.meta.ID() == peer
disabled := js.disabled
@@ -1043,28 +1093,58 @@ func (js *jetStream) processRemovePeer(peer string) {
go s.DisableJetStream()
}
// Now check if we are meta-leader. We will attempt re-assignment.
if !isLeader {
return
}
js.mu.Lock()
defer js.mu.Unlock()
for _, asa := range cc.streams {
for _, sa := range asa {
if rg := sa.Group; rg.isMember(peer) {
js.removePeerFromStreamLocked(sa, peer)
}
}
}
}
// Assumes all checks have already been done.
func (js *jetStream) removePeerFromStream(sa *streamAssignment, peer string) bool {
js.mu.Lock()
defer js.mu.Unlock()
return js.removePeerFromStreamLocked(sa, peer)
}
// Lock should be held.
func (js *jetStream) removePeerFromStreamLocked(sa *streamAssignment, peer string) bool {
if rg := sa.Group; !rg.isMember(peer) {
return false
}
s, cc, csa := js.srv, js.cluster, sa.copyGroup()
if !cc.remapStreamAssignment(csa, peer) {
s.Warnf("JetStream cluster could not remap stream '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name)
return false
replaced := cc.remapStreamAssignment(csa, peer)
if !replaced {
s.Warnf("JetStream cluster could not replace peer for stream '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name)
}
// Send our proposal for this csa. Also use same group definition for all the consumers as well.
cc.meta.Propose(encodeAddStreamAssignment(csa))
rg := csa.Group
for _, ca := range sa.consumers {
cca := *ca
cca.Group.Peers = rg.Peers
cc.meta.Propose(encodeAddConsumerAssignment(&cca))
// Ephemerals are R=1, so only auto-remap durables, or R>1.
if ca.Config.Durable != _EMPTY_ {
cca := *ca
cca.Group.Peers = rg.Peers
cc.meta.Propose(encodeAddConsumerAssignment(&cca))
} else if ca.Group.isMember(peer) {
// These are ephemerals. Check to see if we deleted this peer.
cc.meta.Propose(encodeDeleteConsumerAssignment(ca))
}
}
return true
return replaced
}
// Check if we have peer related entries.
@@ -1087,6 +1167,10 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool
if !isRecovering {
js.processRemovePeer(string(e.Data))
}
} else if e.Type == EntryAddPeer {
if !isRecovering {
js.processAddPeer(string(e.Data))
}
} else {
buf := e.Data
switch entryOp(buf[0]) {
@@ -1606,7 +1690,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
mset.setLastSeq(mset.store.SkipMsg())
continue
}
// Process the actual message here.
if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil {
if !isRecovering {
if err == errLastSeqMismatch {
@@ -1619,7 +1703,6 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
return err
}
}
case deleteMsgOp:
md, err := decodeMsgDelete(buf[1:])
if err != nil {
@@ -2827,6 +2910,8 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
o.stopWithFlags(true, false, false, false)
}
return nil
} else if e.Type == EntryAddPeer {
// Ignore for now.
} else {
buf := e.Data
switch entryOp(buf[0]) {
@@ -3260,6 +3345,14 @@ func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePe
}
}
}
// If we are here let's remove the peer at least.
for i, peer := range sa.Group.Peers {
if peer == removePeer {
sa.Group.Peers[i] = sa.Group.Peers[len(sa.Group.Peers)-1]
sa.Group.Peers = sa.Group.Peers[:len(sa.Group.Peers)-1]
break
}
}
return false
}

View File

@@ -3358,6 +3358,187 @@ func TestJetStreamClusterPeerRemovalAPI(t *testing.T) {
}
}
func TestJetStreamClusterPeerRemovalAndStreamReassignment(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R5S", 5)
defer c.shutdown()
// Client based API
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 3,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Admin based API
ml := c.leader()
nc, err = nats.Connect(ml.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
if err != nil {
t.Fatalf("Failed to create system client: %v", err)
}
defer nc.Close()
// Select the non-leader server for the stream to remove.
if len(si.Cluster.Replicas) < 2 {
t.Fatalf("Not enough replicas found: %+v", si.Cluster)
}
toRemove, cl := si.Cluster.Replicas[0].Name, c.leader()
if toRemove == cl.Name() {
toRemove = si.Cluster.Replicas[1].Name
}
req := &JSApiMetaServerRemoveRequest{Server: toRemove}
jsreq, err := json.Marshal(req)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
rmsg, err := nc.Request(JSApiRemoveServer, jsreq, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var resp JSApiMetaServerRemoveResponse
if err := json.Unmarshal(rmsg.Data, &resp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if resp.Error != nil {
t.Fatalf("Unexpected error: %+v", resp.Error)
}
// In case that server was also meta-leader.
c.waitOnLeader()
checkFor(t, 10*time.Second, 250*time.Millisecond, func() error {
for _, s := range ml.JetStreamClusterPeers() {
if s == toRemove {
return fmt.Errorf("Server still in the peer list")
}
}
return nil
})
// Now wait until the stream is now current.
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second))
if err != nil {
return fmt.Errorf("Could not fetch stream info: %v", err)
}
// We should not see the old server at all.
for _, p := range si.Cluster.Replicas {
if p.Name == toRemove {
t.Fatalf("Peer not removed yet: %+v", toRemove)
}
if !p.Current {
return fmt.Errorf("Expected replica to be current: %+v", p)
}
}
if len(si.Cluster.Replicas) != 2 {
return fmt.Errorf("Expected 2 replicas, got %d", len(si.Cluster.Replicas))
}
return nil
})
}
func TestJetStreamClusterPeerRemovalAndStreamReassignmentWithoutSpace(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client based API
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 3,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Admin based API
ml := c.leader()
nc, err = nats.Connect(ml.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
if err != nil {
t.Fatalf("Failed to create system client: %v", err)
}
defer nc.Close()
// Select the non-leader server for the stream to remove.
if len(si.Cluster.Replicas) < 2 {
t.Fatalf("Not enough replicas found: %+v", si.Cluster)
}
toRemove, cl := si.Cluster.Replicas[0].Name, c.leader()
if toRemove == cl.Name() {
toRemove = si.Cluster.Replicas[1].Name
}
req := &JSApiMetaServerRemoveRequest{Server: toRemove}
jsreq, err := json.Marshal(req)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
rmsg, err := nc.Request(JSApiRemoveServer, jsreq, 2*time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var resp JSApiMetaServerRemoveResponse
if err := json.Unmarshal(rmsg.Data, &resp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if resp.Error != nil {
t.Fatalf("Unexpected error: %+v", resp.Error)
}
checkFor(t, 10*time.Second, 250*time.Millisecond, func() error {
for _, s := range ml.JetStreamClusterPeers() {
if s == toRemove {
return fmt.Errorf("Server still in the peer list")
}
}
return nil
})
// Make sure only 2 peers at this point.
c.waitOnPeerCount(2)
// Now wait until the stream is now current.
streamCurrent := func(nr int) {
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second))
if err != nil {
return fmt.Errorf("Could not fetch stream info: %v", err)
}
// We should not see the old server at all.
for _, p := range si.Cluster.Replicas {
if p.Name == toRemove {
return fmt.Errorf("Peer not removed yet: %+v", toRemove)
}
if !p.Current {
return fmt.Errorf("Expected replica to be current: %+v", p)
}
}
if len(si.Cluster.Replicas) != nr {
return fmt.Errorf("Expected %d replicas, got %d", nr, len(si.Cluster.Replicas))
}
return nil
})
}
// Make sure the peer was removed from the stream and that we did not fill the new spot.
streamCurrent(1)
// Now add in a new server and make sure it gets added to our stream.
c.addInNewServer()
c.waitOnPeerCount(3)
streamCurrent(2)
}
func TestJetStreamClusterPeerOffline(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R5S", 5)
defer c.shutdown()
@@ -3825,7 +4006,7 @@ func TestJetStreamClusterNoDupePeerSelection(t *testing.T) {
}
}
func TestJetStreamClusterRemovePeer(t *testing.T) {
func TestJetStreamClusterStreamRemovePeer(t *testing.T) {
c := createJetStreamClusterExplicit(t, "RNS", 5)
defer c.shutdown()
@@ -3852,6 +4033,20 @@ func TestJetStreamClusterRemovePeer(t *testing.T) {
}
checkSubsPending(t, sub, toSend)
// Do ephemeral too.
esub, err := js.SubscribeSync("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkSubsPending(t, esub, toSend)
ci, err := esub.ConsumerInfo()
if err != nil {
t.Fatalf("Could not fetch consumer info: %v", err)
}
// Capture ephemeral's server and name.
es, en := ci.Cluster.Leader, ci.Name
// Grab stream info.
si, err := js.StreamInfo("TEST")
if err != nil {
@@ -3863,7 +4058,9 @@ func TestJetStreamClusterRemovePeer(t *testing.T) {
}
rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })
toRemove := peers[0]
if cl := c.leader(); toRemove == cl.Name() {
toRemove = peers[1]
}
// First test bad peer.
req := &JSApiStreamRemovePeerRequest{Peer: "NOT VALID"}
jsreq, err := json.Marshal(req)
@@ -3924,8 +4121,10 @@ func TestJetStreamClusterRemovePeer(t *testing.T) {
return nil
})
c.waitOnConsumerLeader("$G", "TEST", "cat")
// Now check consumer info as well.
checkFor(t, 30*time.Second, 100*time.Millisecond, func() error {
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
ci, err := js.ConsumerInfo("TEST", "cat", nats.MaxWait(time.Second))
if err != nil {
return fmt.Errorf("Could not fetch consumer info: %v", err)
@@ -3948,6 +4147,30 @@ func TestJetStreamClusterRemovePeer(t *testing.T) {
}
return nil
})
// Now check ephemeral consumer info.
// Make sure we did not stamp same new group into the ephemeral where R=1.
ci, err = esub.ConsumerInfo()
// If the leader was same as what we just removed, this should fail.
if es == toRemove {
if err != nats.ErrConsumerNotFound {
t.Fatalf("Expected a not found error, got %v", err)
}
// Also make sure this was removed all together.
// We may proactively move things in the future.
for cn := range js.ConsumerNames("TEST") {
if cn == en {
t.Fatalf("Expected ephemeral consumer to be deleted since we removed its only peer")
}
}
} else {
if err != nil {
t.Fatalf("Could not fetch consumer info: %v", err)
}
if len(ci.Cluster.Replicas) != 0 {
t.Fatalf("Expected no replicas for ephemeral, got %d", len(ci.Cluster.Replicas))
}
}
}
func TestJetStreamClusterStreamLeaderStepDown(t *testing.T) {
@@ -8491,7 +8714,7 @@ func (sc *supercluster) waitOnLeader() {
for time.Now().Before(expires) {
for _, c := range sc.clusters {
if leader := c.leader(); leader != nil {
time.Sleep(200 * time.Millisecond)
time.Sleep(250 * time.Millisecond)
return
}
}

View File

@@ -2112,6 +2112,8 @@ func (n *raft) applyCommit(index uint64) error {
}
}
n.writePeerState(&peerState{n.peerNames(), n.csz})
// We pass these up as well.
committed = append(committed, e)
case EntryRemovePeer:
peer := string(e.Data)
n.debug("Removing peer %q", peer)