diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index d1bce993..f9fec58b 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1012,10 +1012,60 @@ func (sa *streamAssignment) copyGroup() *streamAssignment { return &csa } +// Lock should be held. +func (sa *streamAssignment) missingPeers() bool { + return len(sa.Group.Peers) < sa.Config.Replicas +} + +// Called when we detect a new peer. Only the leader will process checking +// for any streams, and consequently any consumers. +func (js *jetStream) processAddPeer(peer string) { + js.mu.Lock() + defer js.mu.Unlock() + + s, cc := js.srv, js.cluster + isLeader := cc.isLeader() + + // Now check if we are meta-leader. We will check for any re-assignments. + if !isLeader { + return + } + + sir, ok := s.nodeToInfo.Load(peer) + if !ok || sir == nil { + return + } + si := sir.(nodeInfo) + + for _, asa := range cc.streams { + for _, sa := range asa { + if sa.missingPeers() { + // Make sure the right cluster etc. + if si.cluster != sa.Client.Cluster { + continue + } + // If we are here we can add in this peer. + csa := sa.copyGroup() + csa.Group.Peers = append(csa.Group.Peers, peer) + // Send our proposal for this csa. Also use same group definition for all the consumers as well. + cc.meta.Propose(encodeAddStreamAssignment(csa)) + for _, ca := range sa.consumers { + // Ephemerals are R=1, so only auto-remap durables, or R>1. + if ca.Config.Durable != _EMPTY_ { + cca := *ca + cca.Group.Peers = csa.Group.Peers + cc.meta.Propose(encodeAddConsumerAssignment(&cca)) + } + } + } + } + } +} + func (js *jetStream) processRemovePeer(peer string) { js.mu.Lock() s, cc := js.srv, js.cluster - + isLeader := cc.isLeader() // All nodes will check if this is them. isUs := cc.meta.ID() == peer disabled := js.disabled @@ -1043,28 +1093,58 @@ func (js *jetStream) processRemovePeer(peer string) { go s.DisableJetStream() } + + // Now check if we are meta-leader. We will attempt re-assignment. + if !isLeader { + return + } + + js.mu.Lock() + defer js.mu.Unlock() + + for _, asa := range cc.streams { + for _, sa := range asa { + if rg := sa.Group; rg.isMember(peer) { + js.removePeerFromStreamLocked(sa, peer) + } + } + } } // Assumes all checks have already been done. func (js *jetStream) removePeerFromStream(sa *streamAssignment, peer string) bool { js.mu.Lock() defer js.mu.Unlock() + return js.removePeerFromStreamLocked(sa, peer) +} + +// Lock should be held. +func (js *jetStream) removePeerFromStreamLocked(sa *streamAssignment, peer string) bool { + if rg := sa.Group; !rg.isMember(peer) { + return false + } s, cc, csa := js.srv, js.cluster, sa.copyGroup() - if !cc.remapStreamAssignment(csa, peer) { - s.Warnf("JetStream cluster could not remap stream '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name) - return false + replaced := cc.remapStreamAssignment(csa, peer) + if !replaced { + s.Warnf("JetStream cluster could not replace peer for stream '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name) } // Send our proposal for this csa. Also use same group definition for all the consumers as well. cc.meta.Propose(encodeAddStreamAssignment(csa)) rg := csa.Group for _, ca := range sa.consumers { - cca := *ca - cca.Group.Peers = rg.Peers - cc.meta.Propose(encodeAddConsumerAssignment(&cca)) + // Ephemerals are R=1, so only auto-remap durables, or R>1. + if ca.Config.Durable != _EMPTY_ { + cca := *ca + cca.Group.Peers = rg.Peers + cc.meta.Propose(encodeAddConsumerAssignment(&cca)) + } else if ca.Group.isMember(peer) { + // These are ephemerals. Check to see if we deleted this peer. + cc.meta.Propose(encodeDeleteConsumerAssignment(ca)) + } } - return true + return replaced } // Check if we have peer related entries. @@ -1087,6 +1167,10 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool if !isRecovering { js.processRemovePeer(string(e.Data)) } + } else if e.Type == EntryAddPeer { + if !isRecovering { + js.processAddPeer(string(e.Data)) + } } else { buf := e.Data switch entryOp(buf[0]) { @@ -1606,7 +1690,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco mset.setLastSeq(mset.store.SkipMsg()) continue } - + // Process the actual message here. if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil { if !isRecovering { if err == errLastSeqMismatch { @@ -1619,7 +1703,6 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco return err } } - case deleteMsgOp: md, err := decodeMsgDelete(buf[1:]) if err != nil { @@ -2827,6 +2910,8 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea o.stopWithFlags(true, false, false, false) } return nil + } else if e.Type == EntryAddPeer { + // Ignore for now. } else { buf := e.Data switch entryOp(buf[0]) { @@ -3260,6 +3345,14 @@ func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePe } } } + // If we are here let's remove the peer at least. + for i, peer := range sa.Group.Peers { + if peer == removePeer { + sa.Group.Peers[i] = sa.Group.Peers[len(sa.Group.Peers)-1] + sa.Group.Peers = sa.Group.Peers[:len(sa.Group.Peers)-1] + break + } + } return false } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 0906ea50..c8f71d11 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -3358,6 +3358,187 @@ func TestJetStreamClusterPeerRemovalAPI(t *testing.T) { } } +func TestJetStreamClusterPeerRemovalAndStreamReassignment(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R5S", 5) + defer c.shutdown() + + // Client based API + s := c.randomNonLeader() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + si, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar"}, + Replicas: 3, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Admin based API + ml := c.leader() + nc, err = nats.Connect(ml.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + if err != nil { + t.Fatalf("Failed to create system client: %v", err) + } + defer nc.Close() + + // Select the non-leader server for the stream to remove. + if len(si.Cluster.Replicas) < 2 { + t.Fatalf("Not enough replicas found: %+v", si.Cluster) + } + toRemove, cl := si.Cluster.Replicas[0].Name, c.leader() + if toRemove == cl.Name() { + toRemove = si.Cluster.Replicas[1].Name + } + + req := &JSApiMetaServerRemoveRequest{Server: toRemove} + jsreq, err := json.Marshal(req) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + rmsg, err := nc.Request(JSApiRemoveServer, jsreq, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var resp JSApiMetaServerRemoveResponse + if err := json.Unmarshal(rmsg.Data, &resp); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if resp.Error != nil { + t.Fatalf("Unexpected error: %+v", resp.Error) + } + // In case that server was also meta-leader. + c.waitOnLeader() + + checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { + for _, s := range ml.JetStreamClusterPeers() { + if s == toRemove { + return fmt.Errorf("Server still in the peer list") + } + } + return nil + }) + + // Now wait until the stream is now current. + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { + si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second)) + if err != nil { + return fmt.Errorf("Could not fetch stream info: %v", err) + } + // We should not see the old server at all. + for _, p := range si.Cluster.Replicas { + if p.Name == toRemove { + t.Fatalf("Peer not removed yet: %+v", toRemove) + } + if !p.Current { + return fmt.Errorf("Expected replica to be current: %+v", p) + } + } + if len(si.Cluster.Replicas) != 2 { + return fmt.Errorf("Expected 2 replicas, got %d", len(si.Cluster.Replicas)) + } + return nil + }) +} + +func TestJetStreamClusterPeerRemovalAndStreamReassignmentWithoutSpace(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Client based API + s := c.randomNonLeader() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + si, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar"}, + Replicas: 3, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Admin based API + ml := c.leader() + nc, err = nats.Connect(ml.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + if err != nil { + t.Fatalf("Failed to create system client: %v", err) + } + defer nc.Close() + + // Select the non-leader server for the stream to remove. + if len(si.Cluster.Replicas) < 2 { + t.Fatalf("Not enough replicas found: %+v", si.Cluster) + } + toRemove, cl := si.Cluster.Replicas[0].Name, c.leader() + if toRemove == cl.Name() { + toRemove = si.Cluster.Replicas[1].Name + } + + req := &JSApiMetaServerRemoveRequest{Server: toRemove} + jsreq, err := json.Marshal(req) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + rmsg, err := nc.Request(JSApiRemoveServer, jsreq, 2*time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + var resp JSApiMetaServerRemoveResponse + if err := json.Unmarshal(rmsg.Data, &resp); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if resp.Error != nil { + t.Fatalf("Unexpected error: %+v", resp.Error) + } + checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { + for _, s := range ml.JetStreamClusterPeers() { + if s == toRemove { + return fmt.Errorf("Server still in the peer list") + } + } + return nil + }) + // Make sure only 2 peers at this point. + c.waitOnPeerCount(2) + + // Now wait until the stream is now current. + streamCurrent := func(nr int) { + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { + si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second)) + if err != nil { + return fmt.Errorf("Could not fetch stream info: %v", err) + } + // We should not see the old server at all. + for _, p := range si.Cluster.Replicas { + if p.Name == toRemove { + return fmt.Errorf("Peer not removed yet: %+v", toRemove) + } + if !p.Current { + return fmt.Errorf("Expected replica to be current: %+v", p) + } + } + if len(si.Cluster.Replicas) != nr { + return fmt.Errorf("Expected %d replicas, got %d", nr, len(si.Cluster.Replicas)) + } + return nil + }) + } + + // Make sure the peer was removed from the stream and that we did not fill the new spot. + streamCurrent(1) + + // Now add in a new server and make sure it gets added to our stream. + c.addInNewServer() + c.waitOnPeerCount(3) + + streamCurrent(2) +} + func TestJetStreamClusterPeerOffline(t *testing.T) { c := createJetStreamClusterExplicit(t, "R5S", 5) defer c.shutdown() @@ -3825,7 +4006,7 @@ func TestJetStreamClusterNoDupePeerSelection(t *testing.T) { } } -func TestJetStreamClusterRemovePeer(t *testing.T) { +func TestJetStreamClusterStreamRemovePeer(t *testing.T) { c := createJetStreamClusterExplicit(t, "RNS", 5) defer c.shutdown() @@ -3852,6 +4033,20 @@ func TestJetStreamClusterRemovePeer(t *testing.T) { } checkSubsPending(t, sub, toSend) + // Do ephemeral too. + esub, err := js.SubscribeSync("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + checkSubsPending(t, esub, toSend) + + ci, err := esub.ConsumerInfo() + if err != nil { + t.Fatalf("Could not fetch consumer info: %v", err) + } + // Capture ephemeral's server and name. + es, en := ci.Cluster.Leader, ci.Name + // Grab stream info. si, err := js.StreamInfo("TEST") if err != nil { @@ -3863,7 +4058,9 @@ func TestJetStreamClusterRemovePeer(t *testing.T) { } rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) toRemove := peers[0] - + if cl := c.leader(); toRemove == cl.Name() { + toRemove = peers[1] + } // First test bad peer. req := &JSApiStreamRemovePeerRequest{Peer: "NOT VALID"} jsreq, err := json.Marshal(req) @@ -3924,8 +4121,10 @@ func TestJetStreamClusterRemovePeer(t *testing.T) { return nil }) + c.waitOnConsumerLeader("$G", "TEST", "cat") + // Now check consumer info as well. - checkFor(t, 30*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { ci, err := js.ConsumerInfo("TEST", "cat", nats.MaxWait(time.Second)) if err != nil { return fmt.Errorf("Could not fetch consumer info: %v", err) @@ -3948,6 +4147,30 @@ func TestJetStreamClusterRemovePeer(t *testing.T) { } return nil }) + + // Now check ephemeral consumer info. + // Make sure we did not stamp same new group into the ephemeral where R=1. + ci, err = esub.ConsumerInfo() + // If the leader was same as what we just removed, this should fail. + if es == toRemove { + if err != nats.ErrConsumerNotFound { + t.Fatalf("Expected a not found error, got %v", err) + } + // Also make sure this was removed all together. + // We may proactively move things in the future. + for cn := range js.ConsumerNames("TEST") { + if cn == en { + t.Fatalf("Expected ephemeral consumer to be deleted since we removed its only peer") + } + } + } else { + if err != nil { + t.Fatalf("Could not fetch consumer info: %v", err) + } + if len(ci.Cluster.Replicas) != 0 { + t.Fatalf("Expected no replicas for ephemeral, got %d", len(ci.Cluster.Replicas)) + } + } } func TestJetStreamClusterStreamLeaderStepDown(t *testing.T) { @@ -8491,7 +8714,7 @@ func (sc *supercluster) waitOnLeader() { for time.Now().Before(expires) { for _, c := range sc.clusters { if leader := c.leader(); leader != nil { - time.Sleep(200 * time.Millisecond) + time.Sleep(250 * time.Millisecond) return } } diff --git a/server/raft.go b/server/raft.go index 70382c47..aaa78f3c 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2112,6 +2112,8 @@ func (n *raft) applyCommit(index uint64) error { } } n.writePeerState(&peerState{n.peerNames(), n.csz}) + // We pass these up as well. + committed = append(committed, e) case EntryRemovePeer: peer := string(e.Data) n.debug("Removing peer %q", peer)