From 98bf861a7a6c8f4423c8609b8e2222f736e5f8ff Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 30 Aug 2022 14:07:04 -0700 Subject: [PATCH] Updates to stream and consumer move logic. Signed-off-by: Derek Collison --- server/gateway.go | 2 +- server/jetstream_api.go | 22 +- server/jetstream_cluster.go | 281 +++++++++++-------------- server/jetstream_helpers_test.go | 22 +- server/jetstream_super_cluster_test.go | 90 +------- server/norace_test.go | 64 ++++++ server/raft.go | 39 ++-- 7 files changed, 241 insertions(+), 279 deletions(-) diff --git a/server/gateway.go b/server/gateway.go index 4cab3303..5568e33a 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -33,7 +33,7 @@ const ( defaultSolicitGatewaysDelay = time.Second defaultGatewayConnectDelay = time.Second defaultGatewayReconnectDelay = time.Second - defaultGatewayRecentSubExpiration = 250 * time.Millisecond + defaultGatewayRecentSubExpiration = 2 * time.Second defaultGatewayMaxRUnsubBeforeSwitch = 1000 oldGWReplyPrefix = "$GR." diff --git a/server/jetstream_api.go b/server/jetstream_api.go index eb39fdbc..35497fa3 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1418,13 +1418,8 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, // Handle clustered version here. if s.JetStreamIsClustered() { - // If we are inline with client, we still may need to do a callout for stream info - // during this call, so place in Go routine to not block client. - if c.kind != ROUTER && c.kind != GATEWAY { - go s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, rmsg, &cfg, nil) - } else { - s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, rmsg, &cfg, nil) - } + // Always do in separate Go routine. + go s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, copyBytes(rmsg), &cfg, nil) return } @@ -2297,7 +2292,7 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ return } - streamFound := false + var streamFound bool cfg := StreamConfig{} currPeers := []string{} currCluster := _EMPTY_ @@ -2328,7 +2323,7 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } - peerFound := false + var peerFound bool for i := 0; i < len(currPeers); i++ { if currPeers[i] == srcPeer { copy(currPeers[1:], currPeers[:i]) @@ -2393,9 +2388,10 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ cfg.Placement = origPlacement - s.Noticef("Requested move of: R=%d stream '%s > %s' from old peer set %+v to new peer set %+v", - cfg.Replicas, streamName, accName, s.peerSetToNames(currPeers), s.peerSetToNames(peers)) - // we will always have peers and therefore never do a callout, therefore it is safe to call inline + s.Noticef("Requested move for stream '%s > %s' R=%d from %+v to %+v", + streamName, accName, cfg.Replicas, s.peerSetToNames(currPeers), s.peerSetToNames(peers)) + + // We will always have peers and therefore never do a callout, therefore it is safe to call inline s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers) } @@ -2501,7 +2497,7 @@ func (s *Server) jsLeaderServerStreamCancelMoveRequest(sub *subscription, c *cli s.Noticef("Requested cancel of move: R=%d '%s > %s' to peer set %+v and restore previous peer set %+v", cfg.Replicas, streamName, accName, s.peerSetToNames(currPeers), s.peerSetToNames(peers)) - // we will always have peers and therefore never do a callout, therefore it is safe to call inline + // We will always have peers and therefore never do a callout, therefore it is safe to call inline s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers) } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 9383555b..bc3d8668 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1642,39 +1642,23 @@ func (mset *stream) removeNode() { } } -// utility function to return the number of current peers in the specified set, -// as well as the first current peer and if the leader (always current) is part of the set or not -func currentPeerCount(ci *ClusterInfo, peerSet []string, leaderId string) (currentCount int, firstPeer string, foundLeader bool) { - for _, peer := range peerSet { - // selfId is leaderId - foundCurrent := peer == leaderId - if foundCurrent { - foundLeader = true +// Helper function to generate peer info. +// lists and sets for old and new. +func genPeerInfo(peers []string, split int) (newPeers, oldPeers []string, newPeerSet, oldPeerSet map[string]bool) { + newPeers = peers[split:] + oldPeers = peers[:split] + newPeerSet = make(map[string]bool, len(newPeers)) + oldPeerSet = make(map[string]bool, len(oldPeers)) + for i, peer := range peers { + if i < split { + oldPeerSet[peer] = true } else { - for _, p := range ci.Replicas { - if peer == string(getHash(p.Name)) { - if p.Current { - foundCurrent = true - } - break - } - } - } - if foundCurrent { - currentCount++ - if firstPeer == _EMPTY_ { - firstPeer = peer - } + newPeerSet[peer] = true } } return } -const defaultScaleDownDelayTicks = 2 - -// how many migration tracker ticks of delay to induce -var scaleDownDelayTicks = defaultScaleDownDelayTicks - // Monitor our stream node for this stream. func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnapshot bool) { s, cc := js.server(), js.cluster @@ -1692,7 +1676,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps return } - qch, lch, aq, uch, selfId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC(), meta.ID() + qch, lch, aq, uch, ourPeerId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC(), meta.ID() s.Debugf("Starting stream monitor for '%s > %s' [%s]", sa.Client.serviceAccount(), sa.Config.Name, n.Group()) defer s.Debugf("Exiting stream monitor for '%s > %s' [%s]", sa.Client.serviceAccount(), sa.Config.Name, n.Group()) @@ -1754,20 +1738,18 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // For migration tracking. var mmt *time.Ticker var mmtc <-chan time.Time - var mDelayTc int startMigrationMonitoring := func() { if mmt == nil { - mmt = time.NewTicker(1 * time.Second) + mmt = time.NewTicker(500 * time.Millisecond) mmtc = mmt.C - mDelayTc = 0 } } stopMigrationMonitoring := func() { if mmt != nil { mmt.Stop() - mmt, mmtc, mDelayTc = nil, nil, 0 + mmt, mmtc = nil, nil } } defer stopMigrationMonitoring() @@ -1935,15 +1917,13 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps case <-uch: // keep stream assignment current sa = mset.streamAssignment() + // keep peer list up to date with config js.checkPeers(mset.raftGroup()) // We get this when we have a new stream assignment caused by an update. // We want to know if we are migrating. - migrating := mset.isMigrating() - // If we are migrating, monitor for the new peers to be caught up. - if isLeader && migrating { - if mmtc == nil { - doSnapshot() + if migrating := mset.isMigrating(); migrating { + if isLeader && mmtc == nil { startMigrationMonitoring() } } else { @@ -1955,15 +1935,12 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps stopMigrationMonitoring() continue } - // Check to see that we have someone caught up. - // TODO(dlc) - For now start checking after a second in order to give proper time to kick in any catchup logic needed. - // What we really need to do longer term is know if we need catchup and make sure that process has kicked off and/or completed. + + // Check to see where we are.. rg := mset.raftGroup() ci := js.clusterInfo(rg) - // The polling interval of one second allows this to be kicked in if needed. - if mset.hasCatchupPeers() { - mset.checkClusterInfo(ci) - } + mset.checkClusterInfo(ci) + // Track the new peers and check the ones that are current. mset.mu.RLock() replicas := mset.cfg.Replicas @@ -1973,74 +1950,65 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps stopMigrationMonitoring() continue } - // Determine if process is finished - toSkip := len(rg.Peers) - replicas - newPeerSet := rg.Peers[toSkip:] - oldPeerSet := rg.Peers[:toSkip] - newPeerTbl := map[string]struct{}{} - for _, peer := range newPeerSet { - newPeerTbl[peer] = struct{}{} - } - currentCount, firstPeer, foundLeader := currentPeerCount(ci, newPeerSet, selfId) - // make sure to wait to ensure that catchup has started. - if currentCount != replicas { - mDelayTc = 0 - continue - } - if mDelayTc < scaleDownDelayTicks { - mDelayTc++ - continue - } + newPeers, oldPeers, newPeerSet, oldPeerSet := genPeerInfo(rg.Peers, len(rg.Peers)-replicas) - // First make sure all consumer are properly scaled down - waitOnConsumerScaledown := false - js.mu.RLock() - if san, ok := cc.streams[accName][sa.Config.Name]; ok { - FOR_CONSUMER_SCALEDOWN: - for cName, c := range san.consumers { - if c.pending || len(c.Group.Peers) > c.Config.replicas(san.Config) { - waitOnConsumerScaledown = true - s.Debugf("Scale down of '%s > %s' blocked by consumer '%s'", - accName, san.Config.Name, cName) + // If we are part of the new peerset and we have been passed the baton. + // We will handle scale down. + if newPeerSet[ourPeerId] { + // First need to check on any consumers and make sure they have moved properly before scaling down ourselves. + js.mu.RLock() + var needToWait bool + for name, c := range sa.consumers { + for _, peer := range c.Group.Peers { + // If we have peers still in the old set block. + if oldPeerSet[peer] { + s.Debugf("Scale down of '%s > %s' blocked by consumer '%s'", accName, sa.Config.Name, name) + needToWait = true + break + } + } + if needToWait { break } - for _, peer := range c.Group.Peers { - if _, ok := newPeerTbl[peer]; !ok { - waitOnConsumerScaledown = true - s.Debugf("Scale down of '%s > %s' blocked by consumer '%s' with old peer %s", - accName, san.Config.Name, cName, peer) - break FOR_CONSUMER_SCALEDOWN + } + js.mu.RUnlock() + if needToWait { + continue + } + + // We are good to go, can scale down here. + for _, p := range oldPeers { + n.ProposeRemovePeer(p) + } + + csa := sa.copyGroup() + csa.Group.Peers = newPeers + csa.Group.Preferred = ourPeerId + csa.Group.Cluster = s.cachedClusterName() + cc.meta.ForwardProposal(encodeUpdateStreamAssignment(csa)) + s.Noticef("Scaling down '%s > %s' to %+v", accName, sa.Config.Name, s.peerSetToNames(newPeers)) + + } else { + // We are the old leader here, from the original peer set. + // We are simply waiting on the new peerset to be caught up so we can transfer leadership. + var newLeaderPeer, newLeader string + neededCurrent, current := replicas/2+1, 0 + for _, r := range ci.Replicas { + if r.Current && newPeerSet[r.peer] { + current++ + if newLeader == _EMPTY_ { + newLeaderPeer, newLeader = r.peer, r.Name } } } - } - js.mu.RUnlock() - - if waitOnConsumerScaledown { - continue - } - - // Remove the old peers or transfer leadership (after which new leader resumes with peer removal). - // stopMigrationMonitoring is invoked on actual leadership change or - // on the next tick when migration completed. - // In case these operations fail, the next tick will retry - if !foundLeader { - s.Debugf("Scale down of '%s' step down ('%s' preferred)", sa.Config.Name, firstPeer) - n.StepDown(firstPeer) - } else { - s.Noticef("Scale down of '%s' to %+v ('%s' preferred)", - sa.Config.Name, s.peerSetToNames(newPeerSet), firstPeer) - for _, p := range oldPeerSet { - n.ProposeRemovePeer(p) + // Check if we have a quorom. + if current >= neededCurrent { + s.Noticef("Transfer of stream leader for '%s > %s' to '%s'", accName, sa.Config.Name, newLeader) + n.StepDown(newLeaderPeer) } - csa := sa.copyGroup() - csa.Group.Peers = newPeerSet - csa.Group.Cluster = s.ClusterName() // use cluster name of leader/self - csa.Group.Preferred = firstPeer - cc.meta.ForwardProposal(encodeUpdateStreamAssignment(csa)) } - mDelayTc = 0 + case err := <-restoreDoneCh: // We have completed a restore from snapshot on this server. The stream assignment has // already been assigned but the replicas will need to catch up out of band. Consumers @@ -3779,7 +3747,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { s.Warnf("No RAFT group for consumer") return } - qch, lch, aq, uch, selfId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), o.updateC(), cc.meta.ID() + qch, lch, aq, uch, ourPeerId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), o.updateC(), cc.meta.ID() s.Debugf("Starting consumer monitor for '%s > %s > %s' [%s]", o.acc.Name, ca.Stream, ca.Name, n.Group()) defer s.Debugf("Exiting consumer monitor for '%s > %s > %s' [%s]", o.acc.Name, ca.Stream, ca.Name, n.Group()) @@ -3825,20 +3793,18 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { // For migration tracking. var mmt *time.Ticker var mmtc <-chan time.Time - var mDelayTc int startMigrationMonitoring := func() { if mmt == nil { - mmt = time.NewTicker(1 * time.Second) + mmt = time.NewTicker(500 * time.Millisecond) mmtc = mmt.C - mDelayTc = 0 } } stopMigrationMonitoring := func() { if mmt != nil { mmt.Stop() - mmt, mmtc, mDelayTc = nil, nil, 0 + mmt, mmtc = nil, nil } } defer stopMigrationMonitoring() @@ -3926,6 +3892,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { continue } rg := o.raftGroup() + ci := js.clusterInfo(rg) replicas, err := o.replica() if err != nil { continue @@ -3935,40 +3902,39 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { stopMigrationMonitoring() continue } - toSkip := len(rg.Peers) - replicas - newPeerSet := rg.Peers[toSkip:] - oldPeerSet := rg.Peers[:toSkip] + newPeers, oldPeers, newPeerSet, _ := genPeerInfo(rg.Peers, len(rg.Peers)-replicas) - ci := js.clusterInfo(rg) - - currentCount, firstPeer, foundLeader := currentPeerCount(ci, newPeerSet, selfId) - - // If all are current we are good - if currentCount == replicas { - if mDelayTc < scaleDownDelayTicks { - mDelayTc++ - continue + // If we are part of the new peerset and we have been passed the baton. + // We will handle scale down. + if newPeerSet[ourPeerId] { + for _, p := range oldPeers { + n.ProposeRemovePeer(p) } - // Remove the old peers or transfer leadership (after which new leader resumes with peer removal). - // stopMigrationMonitoring is invoked on actual leadership change or - // on the next tick when migration completed. - // In case these operations fail, the next tick will retry - if !foundLeader { - s.Debugf("Scale down of '%s > %s' step down ('%s' preferred)", ca.Stream, ca.Name, firstPeer) - n.StepDown(firstPeer) - } else { - s.Noticef("Scale down of '%s > %s' to %+v", ca.Stream, ca.Name, s.peerSetToNames(newPeerSet)) - // truncate this consumer - for _, p := range oldPeerSet { - n.ProposeRemovePeer(p) + cca := ca.copyGroup() + cca.Group.Peers = newPeers + cca.Group.Cluster = s.cachedClusterName() + cc.meta.ForwardProposal(encodeAddConsumerAssignment(cca)) + s.Noticef("Scaling down '%s > %s > %s' to %+v", ca.Client.serviceAccount(), ca.Stream, ca.Name, s.peerSetToNames(newPeers)) + + } else { + var newLeaderPeer, newLeader, newCluster string + neededCurrent, current := replicas/2+1, 0 + for _, r := range ci.Replicas { + if r.Current && newPeerSet[r.peer] { + current++ + if newCluster == _EMPTY_ { + newLeaderPeer, newLeader, newCluster = r.peer, r.Name, r.cluster + } } - cca := ca.copyGroup() - cca.Group.Cluster = s.ClusterName() // use cluster name of leader/self - cca.Group.Peers = newPeerSet - cc.meta.ForwardProposal(encodeAddConsumerAssignment(cca)) + } + + // Check if we have a quorom + if current >= neededCurrent { + s.Noticef("Transfer of consumer leader for '%s > %s > %s' to '%s'", ca.Client.serviceAccount(), ca.Stream, ca.Name, newLeader) + n.StepDown(newLeaderPeer) } } - mDelayTc = 0 + case <-t.C: doSnapshot(false) } @@ -5137,9 +5103,8 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su // Make copy so to not change original. rg := osa.copyGroup().Group - // Check for a move update. - // TODO(dlc) - Should add a resolve from Tags to cluster and check that vs reflect. - isMoveRequest, isMoveCancel := false, false + // Check for a move request. + var isMoveRequest, isMoveCancel bool if lPeerSet := len(peerSet); lPeerSet > 0 { isMoveRequest = true // check if this is a cancellation @@ -5301,8 +5266,7 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su } peerSet = append(peerSet, nrg.Peers...) } - origPeers := len(rg.Peers) - if origPeers == 1 { + if len(rg.Peers) == 1 { rg.Preferred = peerSet[0] } rg.Peers = peerSet @@ -6387,29 +6351,24 @@ func (mset *stream) stateSnapshotLocked() []byte { return b } -// Wiil check if we can do message compression in RAFT and catchup logic. -// TODO(dlc) - Currently version based, could make capability based, but be more involved. +// Will check if we can do message compression in RAFT and catchup logic. func (mset *stream) checkAllowMsgCompress(peers []string) { - var ff, allowed bool + allowed := true for _, id := range peers { - if sir, ok := mset.srv.nodeToInfo.Load(id); ok && sir != nil { - // Check for capability. - if si := sir.(nodeInfo); si.cfg != nil && si.cfg.CompressOK { - if !ff { - ff, allowed = true, true - } else { - allowed = allowed && true - } - } else { - ff, allowed = true, false - } + sir, ok := mset.srv.nodeToInfo.Load(id) + if !ok || sir == nil { + allowed = false + break + } + // Check for capability. + if si := sir.(nodeInfo); si.cfg == nil || !si.cfg.CompressOK { + allowed = false + break } } - if allowed { - mset.mu.Lock() - mset.compressOK = true - mset.mu.Unlock() - } + mset.mu.Lock() + mset.compressOK = allowed + mset.mu.Unlock() } // processClusteredMsg will propose the inbound message to the underlying raft group. diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 954cdbb5..e942526b 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -54,12 +54,12 @@ func (sc *supercluster) shutdown() { if sc == nil { return } - for _, c := range sc.clusters { - shutdownCluster(c) - } for _, np := range sc.nproxies { np.stop() } + for _, c := range sc.clusters { + shutdownCluster(c) + } } func (sc *supercluster) randomServer() *Server { @@ -511,6 +511,7 @@ func (sc *supercluster) waitOnLeader() { } func (sc *supercluster) waitOnAllCurrent() { + sc.t.Helper() for _, c := range sc.clusters { c.waitOnAllCurrent() } @@ -1278,6 +1279,7 @@ func (c *cluster) waitOnServerCurrent(s *Server) { } func (c *cluster) waitOnAllCurrent() { + c.t.Helper() for _, cs := range c.servers { c.waitOnServerCurrent(cs) } @@ -1547,19 +1549,21 @@ func createNetProxy(rtt time.Duration, upRate, downRate int, serverURL string, s } func (np *netProxy) start() { + u, err := url.Parse(np.surl) + if err != nil { + panic(fmt.Sprintf("Could not parse server URL: %v", err)) + } + host := u.Host + go func() { for { client, err := np.listener.Accept() if err != nil { return } - u, err := url.Parse(np.surl) + server, err := net.DialTimeout("tcp", host, time.Second) if err != nil { - panic(fmt.Sprintf("Could not parse server URL: %v", err)) - } - server, err := net.DialTimeout("tcp", u.Host, time.Second) - if err != nil { - panic("Can't connect proxy to NATS server") + continue } np.conns = append(np.conns, client, server) go np.loop(np.rtt, np.up, client, server) diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 3941125f..1f1b756f 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -17,7 +17,6 @@ package server import ( - "bytes" "encoding/json" "errors" "fmt" @@ -1798,6 +1797,10 @@ func TestJetStreamSuperClusterMovingStreamsAndConsumers(t *testing.T) { numPeers++ } if numPeers != 2*replicas { + // The move can happen very quick now, so we might already be done. + if si.Cluster.Name == "C2" { + return nil + } return fmt.Errorf("Expected to see %d replicas, got %d", 2*replicas, numPeers) } return nil @@ -2823,7 +2826,7 @@ func TestJetStreamSuperClusterMoveCancel(t *testing.T) { } } if !found { - fmt.Printf("durable peer group does not match stream peer group") + t.Logf("durable peer group does not match stream peer group") } } } @@ -2876,10 +2879,6 @@ func TestJetStreamSuperClusterMoveCancel(t *testing.T) { } func TestJetStreamSuperClusterDoubleStreamMove(t *testing.T) { - // Shorten this test by factor of 2. - scaleDownDelayTicks = 0 - defer func() { scaleDownDelayTicks = defaultScaleDownDelayTicks }() - server := map[string]struct{}{} sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2, func(serverName, clusterName, storeDir, conf string) string { @@ -3047,6 +3046,7 @@ func TestJetStreamSuperClusterDoubleStreamMove(t *testing.T) { } moveAndCheck := func(from, to string, expectedSet ...string) { + t.Helper() move(from, to) checkFor(t, 40*time.Second, 100*time.Millisecond, func() error { return moveComplete(to, expectedSet...) }) checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { return serverEmpty(from) }) @@ -3080,10 +3080,6 @@ func TestJetStreamSuperClusterDoubleStreamMove(t *testing.T) { } func TestJetStreamSuperClusterPeerEvacuationAndStreamReassignment(t *testing.T) { - // Shorten this test by factor of 2. - scaleDownDelayTicks = 0 - defer func() { scaleDownDelayTicks = defaultScaleDownDelayTicks }() - s := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2, func(serverName, clusterName, storeDir, conf string) string { return fmt.Sprintf("%s\nserver_tags: [cluster:%s, server:%s]", conf, clusterName, serverName) @@ -3164,8 +3160,7 @@ func TestJetStreamSuperClusterPeerEvacuationAndStreamReassignment(t *testing.T) require_NoError(t, err) defer ncsys.Close() - moveReq, err := json.Marshal(&JSApiMetaServerStreamMoveRequest{ - Server: toMoveFrom, Tags: moveTags}) + moveReq, err := json.Marshal(&JSApiMetaServerStreamMoveRequest{Server: toMoveFrom, Tags: moveTags}) require_NoError(t, err) rmsg, err := ncsys.Request(fmt.Sprintf(JSApiServerStreamMoveT, "$G", "TEST"), moveReq, 100*time.Second) require_NoError(t, err) @@ -3219,7 +3214,7 @@ func TestJetStreamSuperClusterPeerEvacuationAndStreamReassignment(t *testing.T) return nil }) // test draining - checkFor(t, 20*time.Second, 1000*time.Millisecond, func() error { + checkFor(t, 20*time.Second, time.Second, func() error { if !listFrom { // when needed determine which server move moved away from si, err := js.StreamInfo("TEST", nats.MaxWait(2*time.Second)) @@ -3228,7 +3223,7 @@ func TestJetStreamSuperClusterPeerEvacuationAndStreamReassignment(t *testing.T) } for n := range startSet { if n != si.Cluster.Leader { - found := false + var found bool for _, p := range si.Cluster.Replicas { if p.Name == n { found = true @@ -3560,73 +3555,6 @@ func TestJetStreamSuperClusterSystemLimitsPlacement(t *testing.T) { } } -func TestJetStreamSuperClusterStreamCathupLongRTT(t *testing.T) { - skip(t) - - // Make C2 far away. - gwm := gwProxyMap{ - "C2": &gwProxy{ - rtt: 300 * time.Millisecond, - up: 1 * 1024 * 1024 * 1024, // 1gbit - down: 1 * 1024 * 1024 * 1024, // 1gbit - }, - } - sc := createJetStreamTaggedSuperClusterWithGWProxy(t, gwm) - defer sc.shutdown() - - nc, js := jsClientConnect(t, sc.randomServer()) - defer nc.Close() - - cfg := &nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"chunk.*"}, - Placement: &nats.Placement{Tags: []string{"cloud:aws", "country:us"}}, - Replicas: 3, - MaxMsgsPerSubject: 1, - } - - // Place a stream in C1. - _, err := js.AddStream(cfg) - require_NoError(t, err) - - chunk := bytes.Repeat([]byte("Z"), 1000*1024) // ~1MB - // 256 MB - for i := 0; i < 256; i++ { - subj := fmt.Sprintf("chunk.%d", i) - js.PublishAsync(subj, chunk) - } - select { - case <-js.PublishAsyncComplete(): - case <-time.After(5 * time.Second): - t.Fatalf("Did not receive completion signal") - } - - // C2, slow RTT. - cfg.Placement = &nats.Placement{Tags: []string{"cloud:gcp", "country:uk"}} - _, err = js.UpdateStream(cfg) - require_NoError(t, err) - - checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { - si, err := js.StreamInfo("TEST") - if err != nil { - return err - } - if si.Cluster.Name != "C2" { - return fmt.Errorf("Wrong cluster: %q", si.Cluster.Name) - } - if si.Cluster.Leader == _EMPTY_ { - return fmt.Errorf("No leader yet") - } else if !strings.HasPrefix(si.Cluster.Leader, "C2-") { - return fmt.Errorf("Wrong leader: %q", si.Cluster.Leader) - } - // Now we want to see that we shrink back to original. - if len(si.Cluster.Replicas) != cfg.Replicas-1 { - return fmt.Errorf("Expected %d replicas, got %d", cfg.Replicas-1, len(si.Cluster.Replicas)) - } - return nil - }) -} - func TestJetStreamSuperClusterMixedModeSwitchToInterestOnlyStaticConfig(t *testing.T) { tmpl := ` listen: 127.0.0.1:-1 diff --git a/server/norace_test.go b/server/norace_test.go index 1b3d7a03..1b3eee12 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -5516,3 +5516,67 @@ func TestNoRaceJetStreamKVLock(t *testing.T) { close(start) wg.Wait() } + +func TestNoRaceJetStreamSuperClusterStreamMoveLongRTT(t *testing.T) { + // Make C2 far away. + gwm := gwProxyMap{ + "C2": &gwProxy{ + rtt: 400 * time.Millisecond, + up: 1 * 1024 * 1024 * 1024, // 1gbit + down: 1 * 1024 * 1024 * 1024, // 1gbit + }, + } + sc := createJetStreamTaggedSuperClusterWithGWProxy(t, gwm) + defer sc.shutdown() + + nc, js := jsClientConnect(t, sc.randomServer()) + defer nc.Close() + + cfg := &nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"chunk.*"}, + Placement: &nats.Placement{Tags: []string{"cloud:aws", "country:us"}}, + Replicas: 3, + } + + // Place a stream in C1. + _, err := js.AddStream(cfg) + require_NoError(t, err) + + chunk := bytes.Repeat([]byte("Z"), 1000*1024) // ~1MB + // 256 MB + for i := 0; i < 256; i++ { + subj := fmt.Sprintf("chunk.%d", i) + js.PublishAsync(subj, chunk) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + // C2, slow RTT. + cfg.Placement = &nats.Placement{Tags: []string{"cloud:gcp", "country:uk"}} + _, err = js.UpdateStream(cfg) + require_NoError(t, err) + + checkFor(t, 10*time.Second, time.Second, func() error { + si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second)) + if err != nil { + return err + } + if si.Cluster.Name != "C2" { + return fmt.Errorf("Wrong cluster: %q", si.Cluster.Name) + } + if si.Cluster.Leader == _EMPTY_ { + return fmt.Errorf("No leader yet") + } else if !strings.HasPrefix(si.Cluster.Leader, "C2-") { + return fmt.Errorf("Wrong leader: %q", si.Cluster.Leader) + } + // Now we want to see that we shrink back to original. + if len(si.Cluster.Replicas) != cfg.Replicas-1 { + return fmt.Errorf("Expected %d replicas, got %d", cfg.Replicas-1, len(si.Cluster.Replicas)) + } + return nil + }) +} diff --git a/server/raft.go b/server/raft.go index a40c638e..1275818e 100644 --- a/server/raft.go +++ b/server/raft.go @@ -996,7 +996,6 @@ func (n *raft) InstallSnapshot(data []byte) error { n.setWriteErr(err) return err } - n.Unlock() psnaps, _ := os.ReadDir(snapDir) @@ -1486,7 +1485,27 @@ func (n *raft) Wipe() { n.Delete() } +const ( + raftAllSubj = "$NRG.>" + raftVoteSubj = "$NRG.V.%s" + raftAppendSubj = "$NRG.AE.%s" + raftPropSubj = "$NRG.P.%s" + raftRemovePeerSubj = "$NRG.RP.%s" + raftReply = "$NRG.R.%s" + raftCatchupReply = "$NRG.CR.%s" +) + // Lock should be held (due to use of random generator) +func (n *raft) newCatchupInbox() string { + var b [replySuffixLen]byte + rn := n.prand.Int63() + for i, l := 0, rn; i < len(b); i++ { + b[i] = digits[l%base] + l /= base + } + return fmt.Sprintf(raftCatchupReply, b[:]) +} + func (n *raft) newInbox() string { var b [replySuffixLen]byte rn := n.prand.Int63() @@ -1494,18 +1513,9 @@ func (n *raft) newInbox() string { b[i] = digits[l%base] l /= base } - return fmt.Sprintf(raftReplySubj, b[:]) + return fmt.Sprintf(raftReply, b[:]) } -const ( - raftAllSubj = "$NRG.>" - raftVoteSubj = "$NRG.V.%s" - raftAppendSubj = "$NRG.AE.%s" - raftPropSubj = "$NRG.P.%s" - raftRemovePeerSubj = "$NRG.RP.%s" - raftReplySubj = "$NRG.R.%s" -) - // Our internal subscribe. // Lock should be held. func (n *raft) subscribe(subject string, cb msgHandler) (*subscription, error) { @@ -1848,6 +1858,8 @@ func (n *raft) decodeAppendEntry(msg []byte, sub *subscription, reply string) (* commit: le.Uint64(msg[16:]), pterm: le.Uint64(msg[24:]), pindex: le.Uint64(msg[32:]), + sub: sub, + reply: reply, } // Decode Entries. ne, ri := int(le.Uint16(msg[40:])), 42 @@ -1864,8 +1876,6 @@ func (n *raft) decodeAppendEntry(msg []byte, sub *subscription, reply string) (* ae.entries = append(ae.entries, &Entry{etype, msg[ri+1 : ri+le]}) ri += le } - ae.reply = reply - ae.sub = sub ae.buf = msg return ae, nil } @@ -2626,6 +2636,7 @@ func (n *raft) handleAppendEntry(sub *subscription, c *client, _ *Account, subje // Lock should be held. func (n *raft) cancelCatchup() { n.debug("Canceling catchup subscription since we are now up to date") + if n.catchup != nil && n.catchup.sub != nil { n.unsubscribe(n.catchup.sub) } @@ -2661,7 +2672,7 @@ func (n *raft) createCatchup(ae *appendEntry) string { pindex: n.pindex, active: time.Now(), } - inbox := n.newInbox() + inbox := n.newCatchupInbox() sub, _ := n.subscribe(inbox, n.handleAppendEntry) n.catchup.sub = sub