Merge pull request #3234 from nats-io/stream-move-fix-first-commit-only

fixes peer removal, simplifies move, more tests
This commit is contained in:
Matthias Hanel
2022-07-07 03:54:10 +02:00
committed by GitHub
6 changed files with 379 additions and 125 deletions

View File

@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.9.0-beta.2"
VERSION = "2.9.0-beta.3"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -2266,10 +2266,11 @@ func (o *consumer) isFiltered() bool {
if mset == nil {
return true
}
if len(mset.cfg.Subjects) > 1 {
return true
if len(mset.cfg.Subjects) == 1 {
return o.cfg.FilterSubject != mset.cfg.Subjects[0]
}
return o.cfg.FilterSubject != mset.cfg.Subjects[0]
// All else return true.
return true
}
// Check if we need an ack for this store seq.
@@ -3794,6 +3795,13 @@ func (mset *stream) deleteConsumer(o *consumer) error {
return o.delete()
}
func (o *consumer) getStream() *stream {
o.mu.RLock()
mset := o.mset
o.mu.RUnlock()
return mset
}
func (o *consumer) streamName() string {
o.mu.RLock()
mset := o.mset
@@ -3887,6 +3895,8 @@ func (o *consumer) delete() error {
func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
o.mu.Lock()
js := o.js
if o.closed {
o.mu.Unlock()
return nil
@@ -3940,6 +3950,12 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
n := o.node
qgroup := o.cfg.DeliverGroup
o.ackMsgs.unregister()
// For cleaning up the node assignment.
var ca *consumerAssignment
if dflag {
ca = o.ca
}
o.mu.Unlock()
if c != nil {
@@ -3993,6 +4009,14 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
}
}
if ca != nil {
js.mu.Lock()
if ca.Group != nil {
ca.Group.node = nil
}
js.mu.Unlock()
}
// Clean up our store.
var err error
if store != nil {

View File

@@ -566,28 +566,15 @@ type JSApiMetaServerRemoveResponse struct {
const JSApiMetaServerRemoveResponseType = "io.nats.jetstream.api.v1.meta_server_remove_response"
/*
// JSApiMetaServerStreamInfoRequest will provide which streams are located on a particular server
type JSApiMetaServerStreamInfoRequest struct {
// Server name of the peer to be evacuated.
Server string `json:"peer"`
}
// JSApiMetaServerStreamInfoResponse is the response to a peer evacuation request in the meta group.
type JSApiMetaServerStreamInfoResponse struct {
ApiResponse
Success bool `json:"success,omitempty"`
Content map[string][]StreamConfig `json:"content,omitempty"`
}
const JSApiMetaServerStreamInfoType = "io.nats.jetstream.api.v1.meta_server_stream_info"
*/
// JSApiMetaServerStreamMoveRequest will move a stream on a server to another
// response to this will come as JSApiStreamUpdateResponse/JSApiStreamUpdateResponseType
type JSApiMetaServerStreamMoveRequest struct {
// Server name of the peer to be evacuated.
Server string `json:"peer"`
Server string `json:"server"`
// Cluster the server is in
Cluster string `json:"cluster,omitempty"`
// Domain the sever is in
Domain string `json:"domain,omitempty"`
// Account name the stream to move is in
Account string `json:"account"`
// Name of stream to move
@@ -2243,18 +2230,16 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _
for _, p := range cc.meta.Peers() {
si, ok := s.nodeToInfo.Load(p.ID)
if ok && si.(nodeInfo).name == req.Server {
srcPeer = p.ID
break
if req.Cluster == _EMPTY_ || req.Cluster == si.(nodeInfo).cluster {
if req.Domain == _EMPTY_ || req.Domain == si.(nodeInfo).domain {
srcPeer = p.ID
break
}
}
}
}
js.mu.RUnlock()
if srcPeer == _EMPTY_ {
resp.Error = NewJSClusterServerNotMemberError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
targetAcc, ok := s.accounts.Load(req.Account)
if !ok {
resp.Error = NewJSNoAccountError()
@@ -2279,21 +2264,36 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _
}
js.mu.Unlock()
// make sure src peer is first. Removal will drop peers from the left
for i := 0; i < len(currPeers); i++ {
if currPeers[i] == srcPeer {
currPeers[i] = currPeers[0]
currPeers[0] = srcPeer
break
}
}
if !streamFound {
resp.Error = NewJSStreamNotFoundError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// if server was picked, make sure src peer exists and move it to first position.
// removal will drop peers from the left
if req.Server != _EMPTY_ {
if srcPeer == _EMPTY_ {
resp.Error = NewJSClusterServerNotMemberError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
peerFound := false
for i := 0; i < len(currPeers); i++ {
if currPeers[i] == srcPeer {
copy(currPeers[1:], currPeers[:i])
currPeers[0] = srcPeer
peerFound = true
break
}
}
if !peerFound {
resp.Error = NewJSClusterPeerNotMemberError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
}
// make sure client is scoped to requested account
ciNew := *(ci)
ciNew.Account = req.Account

View File

@@ -1709,9 +1709,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
isRecovering := true
// For migration tracking.
var migrating bool
var peerGroup peerMigrateType
var mmt *time.Ticker
var mmtc <-chan time.Time
@@ -1757,7 +1754,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
if sendSnapshot && mset != nil && n != nil {
n.SendSnapshot(mset.stateSnapshot())
}
for {
select {
case <-s.quitCh:
@@ -1824,15 +1820,14 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// We may receive a leader change after the stream assignment which would cancel us
// monitoring for this closely. So re-assess our state here as well.
migrating, peerGroup = mset.isMigrating(cc.meta.ID())
// Or the old leader is no longer part of the set and transferred leadership
// for this leader to resume with removal
migrating := mset.isMigrating()
// Check for migrations here. We set the state on the stream assignment update below.
if isLeader && migrating {
if peerGroup == oldPeerGroup {
startMigrationMonitoring()
} else {
stopMigrationMonitoring()
}
if isLeader && migrating && mmtc == nil {
doSnapshot()
startMigrationMonitoring()
}
// Here we are checking if we are not the leader but we have been asked to allow
@@ -1881,15 +1876,14 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
case <-uch:
// We get this when we have a new stream assignment caused by an update. We want
// to know if we are migrating.
if cc == nil || cc.meta == nil {
migrating = false
} else {
migrating, peerGroup = mset.isMigrating(cc.meta.ID())
migrating := false
if cc != nil && cc.meta != nil {
migrating = mset.isMigrating()
}
// If we are migrating and in the old peer group and we are leader, monitor for the
// new peers to be caught up. We could not be leader yet, so we will do same check below
// on leadership change.
if isLeader && migrating && peerGroup == oldPeerGroup {
// If we are migrating, monitor for the new peers to be caught up.
if !migrating {
stopMigrationMonitoring()
} else if isLeader && mmtc == nil {
doSnapshot()
startMigrationMonitoring()
}
@@ -1909,13 +1903,11 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
mset.checkClusterInfo(ci)
}
// Track the new peers and check the ones that are current.
if len(ci.Replicas)+1 != len(rg.Peers) {
continue
}
mset.mu.RLock()
replicas := mset.cfg.Replicas
mset.mu.RUnlock()
if len(rg.Peers) <= replicas {
// Migration no longer happening, so not our job anymore
stopMigrationMonitoring()
continue
}
@@ -1924,6 +1916,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
newPeerSet := rg.Peers[toSkip:]
currentCount := 0
firstPeer := _EMPTY_
foundSelf := false
selfId := cc.meta.ID()
for _, peer := range newPeerSet {
foundCurrent := peer == mset.leader
if !foundCurrent {
@@ -1942,12 +1936,21 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
firstPeer = peer
}
}
if peer == selfId {
foundSelf = true
}
}
// If all are current we are good, or if we have some offline and we have a quorum.
if quorum := replicas/2 + 1; currentCount >= quorum {
stopMigrationMonitoring()
// Remove the old peers and transfer leadership.
time.AfterFunc(2*time.Second, func() { js.removeOldPeers(mset, firstPeer, newPeerSet) })
// Remove the old peers or transfer leadership (after which new leader resumes with peer removal).
// stopMigrationMonitoring is invoked on actual leadership change or
// on the next tick when migration completed.
// In case these operations fail, the next tick will retry
if !foundSelf {
n.StepDown(firstPeer)
} else {
js.removeOldPeers(mset, selfId, newPeerSet)
}
}
case err := <-restoreDoneCh:
// We have completed a restore from snapshot on this server. The stream assignment has
@@ -2048,18 +2051,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
}
}
// When we are migration denotes if we ourselves are part of the old peer set or the new one.
// Both types will be running at the same time as we scale up to extend into the new cluster.
// Once detected we will us our type to dictate our behavior.
type peerMigrateType int8
const (
oldPeerGroup = peerMigrateType(iota)
newPeerGroup
)
// Determine if we are migrating and if so if we are part of the old or new set.
func (mset *stream) isMigrating(selfPeer string) (bool, peerMigrateType) {
// Determine if we are migrating
func (mset *stream) isMigrating() bool {
mset.mu.RLock()
js, sa := mset.js, mset.sa
mset.mu.RUnlock()
@@ -2070,23 +2063,13 @@ func (mset *stream) isMigrating(selfPeer string) (bool, peerMigrateType) {
// During migration we will always be R>1, even when we start R1.
// So if we do not have a group or node we no we are not migrating.
if sa == nil || sa.Group == nil || sa.Group.node == nil {
return false, oldPeerGroup
return false
}
// The sign of migration is if our group peer count != configured replica count.
if sa.Config.Replicas == len(sa.Group.Peers) {
return false, oldPeerGroup
return false
}
// So we believe we are migrating here, need to determine if we are the old set or new set.
for i, peer := range sa.Group.Peers {
if peer == selfPeer {
if i >= sa.Config.Replicas {
return true, newPeerGroup
}
break
}
}
return true, oldPeerGroup
return true
}
// resetClusteredState is called when a clustered stream had a sequence mismatch and needs to be reset.
@@ -2332,8 +2315,17 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
}
js.mu.RUnlock()
// We only need to do processing if this is us.
if peer := string(e.Data); peer == ourID {
mset.stop(true, false)
if peer := string(e.Data); peer == ourID && mset != nil {
// Double check here with the registered stream assignment.
shouldRemove := true
if sa := mset.streamAssignment(); sa != nil && sa.Group != nil {
js.mu.RLock()
shouldRemove = !sa.Group.isMember(ourID)
js.mu.RUnlock()
}
if shouldRemove {
mset.stop(true, false)
}
}
return nil
}
@@ -3701,7 +3693,17 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
}
js.mu.RUnlock()
if peer := string(e.Data); peer == ourID {
o.stopWithFlags(true, false, false, false)
shouldRemove := true
if mset := o.getStream(); mset != nil {
if sa := mset.streamAssignment(); sa != nil && sa.Group != nil {
js.mu.RLock()
shouldRemove = !sa.Group.isMember(ourID)
js.mu.RUnlock()
}
}
if shouldRemove {
o.stopWithFlags(true, false, false, false)
}
}
return nil
} else if e.Type == EntryAddPeer {
@@ -4251,7 +4253,7 @@ func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePe
}
// selectPeerGroup will select a group of peers to start a raft group.
func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig, existing []string) []string {
func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig, existing []string) (re []string) {
if cluster == _EMPTY_ || cfg == nil {
return nil
}
@@ -4287,6 +4289,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
ep = make(map[string]struct{})
for _, p := range existing {
ep[p] = struct{}{}
//TODO preload unique tag prefix
}
}

View File

@@ -3673,6 +3673,176 @@ func TestJetStreamClusterPeerExclusionTag(t *testing.T) {
require_NoError(t, err)
}
func TestJetStreamClusterDoubleStreamReassignment(t *testing.T) {
server := map[string]struct{}{}
sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2,
func(serverName, clusterName, storeDir, conf string) string {
server[serverName] = struct{}{}
return fmt.Sprintf("%s\nserver_tags: [%s]", conf, serverName)
})
defer sc.shutdown()
// Client based API
c := sc.randomCluster()
srv := c.randomNonLeader()
nc, js := jsClientConnect(t, srv)
defer nc.Close()
siCreate, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
srvMoveList := []string{siCreate.Cluster.Leader, siCreate.Cluster.Replicas[0].Name, siCreate.Cluster.Replicas[1].Name}
// determine empty server
for _, s := range srvMoveList {
delete(server, s)
}
// pick left over server in same cluster as other server
for s := range server {
// server name is prefixed with cluster name
if strings.HasPrefix(s, c.name) {
srvMoveList = append(srvMoveList, s)
break
}
}
servers := []*Server{
c.serverByName(srvMoveList[0]),
c.serverByName(srvMoveList[1]),
c.serverByName(srvMoveList[2]),
c.serverByName(srvMoveList[3]), // starts out empty
}
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "DUR", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)
toSend := uint64(100)
for i := uint64(0); i < toSend; i++ {
_, err = js.Publish("foo", nil)
require_NoError(t, err)
}
ncsys, err := nats.Connect(srv.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
require_NoError(t, err)
defer ncsys.Close()
move := func(fromSrv string, toTags ...string) {
sEmpty := c.serverByName(fromSrv)
jszBefore, err := sEmpty.Jsz(nil)
require_NoError(t, err)
require_True(t, jszBefore.Streams == 1)
moveReq, err := json.Marshal(&JSApiMetaServerStreamMoveRequest{
Server: fromSrv, Account: "$G", Stream: "TEST", Tags: toTags})
require_NoError(t, err)
rmsg, err := ncsys.Request(JSApiServerStreamMove, moveReq, 100*time.Second)
require_NoError(t, err)
var moveResp JSApiStreamUpdateResponse
require_NoError(t, json.Unmarshal(rmsg.Data, &moveResp))
require_True(t, moveResp.Error == nil)
}
serverEmpty := func(fromSrv string) error {
if jszAfter, err := c.serverByName(fromSrv).Jsz(nil); err != nil {
return fmt.Errorf("could not fetch JS info for server: %v", err)
} else if jszAfter.Streams != 0 {
return fmt.Errorf("empty server still has %d streams", jszAfter.Streams)
} else if jszAfter.Consumers != 0 {
return fmt.Errorf("empty server still has %d consumers", jszAfter.Consumers)
} else if jszAfter.Store != 0 {
return fmt.Errorf("empty server still has %d storage", jszAfter.Store)
}
return nil
}
moveComplete := func(toSrv string, expectedSet ...string) error {
eSet := map[string]int{}
for i, sExpected := range expectedSet {
eSet[sExpected] = i
s := c.serverByName(sExpected)
if !s.JetStreamIsStreamAssigned("$G", "TEST") {
return fmt.Errorf("expected stream to be assigned to %s", sExpected)
}
// test list order invariant
js, cc := s.getJetStreamCluster()
js.mu.Lock()
if sa, ok := cc.streams["$G"]["TEST"]; !ok {
js.mu.Unlock()
return fmt.Errorf("stream not found in cluster")
} else if len(sa.Group.Peers) != 3 {
js.mu.Unlock()
return fmt.Errorf("peers not reset")
} else if sa.Group.Peers[i] != string(getHash(sExpected)) {
js.mu.Unlock()
return fmt.Errorf("expected peer %s on index %d, got %s/%s",
sa.Group.Peers[i], i, string(getHash(sExpected)), sExpected)
}
js.mu.Unlock()
}
for _, s := range servers {
if jszAfter, err := c.serverByName(toSrv).Jsz(nil); err != nil {
return fmt.Errorf("could not fetch JS info for server: %v", err)
} else if jszAfter.Messages != toSend {
return fmt.Errorf("messages not yet copied, got %d, expected %d", jszAfter.Messages, toSend)
}
nc, js := jsClientConnect(t, s)
defer nc.Close()
if si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second)); err != nil {
return fmt.Errorf("could not fetch stream info: %v", err)
} else if len(si.Cluster.Replicas)+1 != si.Config.Replicas {
return fmt.Errorf("not yet downsized replica should be empty has: %d %s",
len(si.Cluster.Replicas), si.Cluster.Leader)
} else if si.Cluster.Leader == _EMPTY_ {
return fmt.Errorf("leader not found")
} else if len(expectedSet) > 0 {
if _, ok := eSet[si.Cluster.Leader]; !ok {
return fmt.Errorf("leader %s not in expected set %+v", si.Cluster.Leader, eSet)
} else if _, ok := eSet[si.Cluster.Replicas[0].Name]; !ok {
return fmt.Errorf("leader %s not in expected set %+v", si.Cluster.Replicas[0].Name, eSet)
} else if _, ok := eSet[si.Cluster.Replicas[1].Name]; !ok {
return fmt.Errorf("leader %s not in expected set %+v", si.Cluster.Replicas[1].Name, eSet)
}
}
nc.Close()
}
return nil
}
moveAndCheck := func(from, to string, expectedSet ...string) {
move(from, to)
checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { return moveComplete(to, expectedSet...) })
checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { return serverEmpty(from) })
}
checkFor(t, 20*time.Second, 1000*time.Millisecond, func() error { return serverEmpty(srvMoveList[3]) })
// first iteration establishes order of server 0-2 (the internal order in the server could be 1,0,2)
moveAndCheck(srvMoveList[0], srvMoveList[3])
moveAndCheck(srvMoveList[1], srvMoveList[0])
moveAndCheck(srvMoveList[2], srvMoveList[1])
moveAndCheck(srvMoveList[3], srvMoveList[2], srvMoveList[0], srvMoveList[1], srvMoveList[2])
// second iteration iterates in order
moveAndCheck(srvMoveList[0], srvMoveList[3], srvMoveList[1], srvMoveList[2], srvMoveList[3])
moveAndCheck(srvMoveList[1], srvMoveList[0], srvMoveList[2], srvMoveList[3], srvMoveList[0])
moveAndCheck(srvMoveList[2], srvMoveList[1], srvMoveList[3], srvMoveList[0], srvMoveList[1])
moveAndCheck(srvMoveList[3], srvMoveList[2], srvMoveList[0], srvMoveList[1], srvMoveList[2])
// iterate in the opposite direction and establish order 2-0
moveAndCheck(srvMoveList[2], srvMoveList[3], srvMoveList[0], srvMoveList[1], srvMoveList[3])
moveAndCheck(srvMoveList[1], srvMoveList[2], srvMoveList[0], srvMoveList[3], srvMoveList[2])
moveAndCheck(srvMoveList[0], srvMoveList[1], srvMoveList[3], srvMoveList[2], srvMoveList[1])
moveAndCheck(srvMoveList[3], srvMoveList[0], srvMoveList[2], srvMoveList[1], srvMoveList[0])
// move server in the middle of list
moveAndCheck(srvMoveList[1], srvMoveList[3], srvMoveList[2], srvMoveList[0], srvMoveList[3])
moveAndCheck(srvMoveList[0], srvMoveList[1], srvMoveList[2], srvMoveList[3], srvMoveList[1])
moveAndCheck(srvMoveList[3], srvMoveList[0], srvMoveList[2], srvMoveList[1], srvMoveList[0])
// repeatedly use end
moveAndCheck(srvMoveList[0], srvMoveList[3], srvMoveList[2], srvMoveList[1], srvMoveList[3])
moveAndCheck(srvMoveList[3], srvMoveList[0], srvMoveList[2], srvMoveList[1], srvMoveList[0])
moveAndCheck(srvMoveList[0], srvMoveList[3], srvMoveList[2], srvMoveList[1], srvMoveList[3])
moveAndCheck(srvMoveList[3], srvMoveList[0], srvMoveList[2], srvMoveList[1], srvMoveList[0])
}
func TestJetStreamClusterPeerEvacuationAndStreamReassignment(t *testing.T) {
s := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2,
func(serverName, clusterName, storeDir, conf string) string {
@@ -3687,7 +3857,7 @@ func TestJetStreamClusterPeerEvacuationAndStreamReassignment(t *testing.T) {
nc, js := jsClientConnect(t, srv)
defer nc.Close()
test := func(r int, moveTags []string, targetCluster string, testMigrateTo bool) {
test := func(r int, moveTags []string, targetCluster string, testMigrateTo bool, listFrom bool) {
si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
@@ -3695,7 +3865,12 @@ func TestJetStreamClusterPeerEvacuationAndStreamReassignment(t *testing.T) {
})
require_NoError(t, err)
defer js.DeleteStream("TEST")
toMoveFrom := si.Cluster.Leader
startSet := map[string]struct{}{
si.Cluster.Leader: {},
}
for _, p := range si.Cluster.Replicas {
startSet[p.Name] = struct{}{}
}
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "DUR", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)
@@ -3708,8 +3883,12 @@ func TestJetStreamClusterPeerEvacuationAndStreamReassignment(t *testing.T) {
require_NoError(t, err)
}
sEmpty := c.serverByName(toMoveFrom)
jszBefore, err := sEmpty.Jsz(nil)
toMoveFrom := si.Cluster.Leader
if !listFrom {
toMoveFrom = _EMPTY_
}
sLdr := c.serverByName(si.Cluster.Leader)
jszBefore, err := sLdr.Jsz(nil)
require_NoError(t, err)
require_True(t, jszBefore.Streams == 1)
require_True(t, jszBefore.Consumers >= 1)
@@ -3754,23 +3933,6 @@ func TestJetStreamClusterPeerEvacuationAndStreamReassignment(t *testing.T) {
require_NoError(t, json.Unmarshal(rmsg.Data, &moveResp))
require_True(t, moveResp.Error == nil)
// test draining
checkFor(t, 20*time.Second, 1000*time.Millisecond, func() error {
jszAfter, err := sEmpty.Jsz(nil)
if err != nil {
return fmt.Errorf("could not fetch JS info for server: %v", err)
}
if jszAfter.Streams != 0 {
return fmt.Errorf("empty server still has %d streams", jszAfter.Streams)
}
if jszAfter.Consumers != 0 {
return fmt.Errorf("empty server still has %d consumers", jszAfter.Consumers)
}
if jszAfter.Store != 0 {
return fmt.Errorf("empty server still has %d storage", jszAfter.Store)
}
return nil
})
// test move to particular server
if testMigrateTo {
toSrv := c.serverByName(migrateToServer)
@@ -3794,6 +3956,9 @@ func TestJetStreamClusterPeerEvacuationAndStreamReassignment(t *testing.T) {
if si.Cluster.Leader == toMoveFrom {
return fmt.Errorf("peer not removed yet: %+v", toMoveFrom)
}
if si.Cluster.Leader == _EMPTY_ {
return fmt.Errorf("no leader yet")
}
if len(si.Cluster.Replicas) != r-1 {
return fmt.Errorf("not yet downsized replica should be empty has: %d", len(si.Cluster.Replicas))
}
@@ -3813,6 +3978,46 @@ func TestJetStreamClusterPeerEvacuationAndStreamReassignment(t *testing.T) {
}
return nil
})
// test draining
checkFor(t, 20*time.Second, 1000*time.Millisecond, func() error {
if !listFrom {
// when needed determine which server move moved away from
si, err := js.StreamInfo("TEST", nats.MaxWait(2*time.Second))
require_NoError(t, err)
for n := range startSet {
if n != si.Cluster.Leader {
found := false
for _, p := range si.Cluster.Replicas {
if p.Name == n {
found = true
break
}
}
if !found {
toMoveFrom = n
}
}
}
}
if toMoveFrom == _EMPTY_ {
return fmt.Errorf("server to move away from not found")
}
sEmpty := c.serverByName(toMoveFrom)
jszAfter, err := sEmpty.Jsz(nil)
if err != nil {
return fmt.Errorf("could not fetch JS info for server: %v", err)
}
if jszAfter.Streams != 0 {
return fmt.Errorf("empty server still has %d streams", jszAfter.Streams)
}
if jszAfter.Consumers != 0 {
return fmt.Errorf("empty server still has %d consumers", jszAfter.Consumers)
}
if jszAfter.Store != 0 {
return fmt.Errorf("empty server still has %d storage", jszAfter.Store)
}
return nil
})
// consume messages from ephemeral consumer
for i := 0; i < 100; i++ {
_, err := sub.NextMsg(time.Second)
@@ -3822,15 +4027,21 @@ func TestJetStreamClusterPeerEvacuationAndStreamReassignment(t *testing.T) {
for i := 1; i <= 3; i++ {
t.Run(fmt.Sprintf("r%d", i), func(t *testing.T) {
test(i, nil, "C1", false)
test(i, nil, "C1", false, true)
})
t.Run(fmt.Sprintf("r%d-explicit", i), func(t *testing.T) {
test(i, nil, "C1", true)
test(i, nil, "C1", true, true)
})
t.Run(fmt.Sprintf("r%d-nosrc", i), func(t *testing.T) {
test(i, nil, "C1", false, false)
})
}
t.Run("r3-cluster-move", func(t *testing.T) {
test(3, []string{"cluster:C2"}, "C2", false)
test(3, []string{"cluster:C2"}, "C2", false, false)
})
t.Run("r3-cluster-move-nosrc", func(t *testing.T) {
test(3, []string{"cluster:C2"}, "C2", false, true)
})
}
@@ -4073,7 +4284,7 @@ func TestJetStreamClusterCreateResponseAdvisoriesHaveSubject(t *testing.T) {
if err := json.Unmarshal(m.Data, &audit); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if audit.Subject == "" {
if audit.Subject == _EMPTY_ {
t.Fatalf("Expected subject, got nothing")
}
}
@@ -8670,15 +8881,19 @@ func TestJetStreamClusterStreamUpdateMissingBeginning(t *testing.T) {
nsl = c.restartServer(nsl)
c.waitOnStreamCurrent(nsl, "$G", "TEST")
mset, _ = nsl.GlobalAccount().lookupStream("TEST")
cloneState := mset.state()
mset, _ = c.streamLeader("$G", "TEST").GlobalAccount().lookupStream("TEST")
leaderState := mset.state()
if !reflect.DeepEqual(cloneState, leaderState) {
t.Fatalf("States do not match: %+v vs %+v", cloneState, leaderState)
}
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
mset, _ = nsl.GlobalAccount().lookupStream("TEST")
cloneState := mset.state()
if reflect.DeepEqual(cloneState, leaderState) {
return nil
} else {
return fmt.Errorf("States do not match: %+v vs %+v", cloneState, leaderState)
}
})
}
// Issue #2666

View File

@@ -4087,9 +4087,11 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
}
// Cluster cleanup
var sa *streamAssignment
if n := mset.node; n != nil {
if deleteFlag {
n.Delete()
sa = mset.sa
} else {
n.Stop()
}
@@ -4137,6 +4139,16 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
// Clustered cleanup.
mset.mu.Unlock()
// Check if the stream assignment has the group node specified.
// We need this cleared for if the stream gets reassigned here.
if sa != nil {
js.mu.Lock()
if sa.Group != nil {
sa.Group.node = nil
}
js.mu.Unlock()
}
c.closeConnection(ClientClosed)
if sysc != nil {