Moving super-cluster tests from cluster tests file to supercluster file

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2022-07-27 17:14:19 -06:00
parent 88203dd5d5
commit 38727417df
2 changed files with 539 additions and 539 deletions

View File

@@ -2648,3 +2648,542 @@ func TestJetStreamSuperClusterStreamDirectGetMirrorQueueGroup(t *testing.T) {
m = getMsg(nc)
require_True(t, m.Header.Get(JSStream) == "M2")
}
func TestJetStreamSuperClusterMoveCancel(t *testing.T) {
server := map[string]struct{}{}
sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2,
func(serverName, clusterName, storeDir, conf string) string {
server[serverName] = struct{}{}
return fmt.Sprintf("%s\nserver_tags: [%s]", conf, serverName)
})
defer sc.shutdown()
// Client based API
c := sc.randomCluster()
srv := c.randomNonLeader()
nc, js := jsClientConnect(t, srv)
defer nc.Close()
siCreate, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
streamPeerSrv := []string{siCreate.Cluster.Leader, siCreate.Cluster.Replicas[0].Name, siCreate.Cluster.Replicas[1].Name}
// determine empty server
for _, s := range streamPeerSrv {
delete(server, s)
}
// pick left over server in same cluster as other server
emptySrv := _EMPTY_
for s := range server {
// server name is prefixed with cluster name
if strings.HasPrefix(s, c.name) {
emptySrv = s
break
}
}
expectedPeers := map[string]struct{}{
string(getHash(streamPeerSrv[0])): {},
string(getHash(streamPeerSrv[1])): {},
string(getHash(streamPeerSrv[2])): {},
}
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "DUR", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)
ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{InactiveThreshold: time.Hour, AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)
ephName := ci.Name
toSend := uint64(1_000)
for i := uint64(0); i < toSend; i++ {
_, err = js.Publish("foo", nil)
require_NoError(t, err)
}
serverEmpty := func(fromSrv string) error {
if jszAfter, err := c.serverByName(fromSrv).Jsz(nil); err != nil {
return fmt.Errorf("could not fetch JS info for server: %v", err)
} else if jszAfter.Streams != 0 {
return fmt.Errorf("empty server still has %d streams", jszAfter.Streams)
} else if jszAfter.Consumers != 0 {
return fmt.Errorf("empty server still has %d consumers", jszAfter.Consumers)
} else if jszAfter.Bytes != 0 {
return fmt.Errorf("empty server still has %d storage", jszAfter.Store)
}
return nil
}
checkSrvInvariant := func(s *Server, expectedPeers map[string]struct{}) error {
js, cc := s.getJetStreamCluster()
js.mu.Lock()
defer js.mu.Unlock()
if sa, ok := cc.streams["$G"]["TEST"]; !ok {
return fmt.Errorf("stream not found")
} else if len(sa.Group.Peers) != len(expectedPeers) {
return fmt.Errorf("stream peer group size not %d, but %d", len(expectedPeers), len(sa.Group.Peers))
} else if da, ok := sa.consumers["DUR"]; !ok {
return fmt.Errorf("durable not found")
} else if len(da.Group.Peers) != len(expectedPeers) {
return fmt.Errorf("durable peer group size not %d, but %d", len(expectedPeers), len(da.Group.Peers))
} else if ea, ok := sa.consumers[ephName]; !ok {
return fmt.Errorf("ephemeral not found")
} else if len(ea.Group.Peers) != 1 {
return fmt.Errorf("ephemeral peer group size not 1, but %d", len(ea.Group.Peers))
} else if _, ok := expectedPeers[ea.Group.Peers[0]]; !ok {
return fmt.Errorf("ephemeral peer not an expected peer")
} else {
for _, p := range sa.Group.Peers {
if _, ok := expectedPeers[p]; !ok {
return fmt.Errorf("peer not expected")
}
found := false
for _, dp := range da.Group.Peers {
if p == dp {
found = true
break
}
}
if !found {
fmt.Printf("durable peer group does not match stream peer group")
}
}
}
return nil
}
ncsys, err := nats.Connect(srv.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
require_NoError(t, err)
defer ncsys.Close()
for _, moveFromSrv := range streamPeerSrv {
moveReq, err := json.Marshal(&JSApiMetaServerStreamMoveRequest{Server: moveFromSrv, Tags: []string{emptySrv}})
require_NoError(t, err)
rmsg, err := ncsys.Request(fmt.Sprintf(JSApiServerStreamMoveT, "$G", "TEST"), moveReq, 5*time.Second)
require_NoError(t, err)
var moveResp JSApiStreamUpdateResponse
require_NoError(t, json.Unmarshal(rmsg.Data, &moveResp))
require_True(t, moveResp.Error == nil)
rmsg, err = ncsys.Request(fmt.Sprintf(JSApiServerStreamCancelMoveT, "$G", "TEST"), nil, 5*time.Second)
require_NoError(t, err)
var cancelResp JSApiStreamUpdateResponse
require_NoError(t, json.Unmarshal(rmsg.Data, &cancelResp))
if cancelResp.Error != nil && ErrorIdentifier(cancelResp.Error.ErrCode) == JSStreamMoveNotInProgress {
t.Skip("This can happen with delays, when Move completed before Cancel", cancelResp.Error)
return
}
require_True(t, cancelResp.Error == nil)
for _, sExpected := range streamPeerSrv {
s := c.serverByName(sExpected)
require_True(t, s.JetStreamIsStreamAssigned("$G", "TEST"))
checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { return checkSrvInvariant(s, expectedPeers) })
}
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { return serverEmpty(emptySrv) })
}
}
func TestJetStreamSuperClusterDoubleStreamMove(t *testing.T) {
server := map[string]struct{}{}
sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2,
func(serverName, clusterName, storeDir, conf string) string {
server[serverName] = struct{}{}
return fmt.Sprintf("%s\nserver_tags: [%s]", conf, serverName)
})
defer sc.shutdown()
// Client based API
c := sc.randomCluster()
srv := c.randomNonLeader()
nc, js := jsClientConnect(t, srv)
defer nc.Close()
siCreate, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
srvMoveList := []string{siCreate.Cluster.Leader, siCreate.Cluster.Replicas[0].Name, siCreate.Cluster.Replicas[1].Name}
// determine empty server
for _, s := range srvMoveList {
delete(server, s)
}
// pick left over server in same cluster as other server
for s := range server {
// server name is prefixed with cluster name
if strings.HasPrefix(s, c.name) {
srvMoveList = append(srvMoveList, s)
break
}
}
servers := []*Server{
c.serverByName(srvMoveList[0]),
c.serverByName(srvMoveList[1]),
c.serverByName(srvMoveList[2]),
c.serverByName(srvMoveList[3]), // starts out empty
}
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "DUR", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)
ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{InactiveThreshold: time.Hour, AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)
ephName := ci.Name
toSend := uint64(100)
for i := uint64(0); i < toSend; i++ {
_, err = js.Publish("foo", nil)
require_NoError(t, err)
}
ncsys, err := nats.Connect(srv.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
require_NoError(t, err)
defer ncsys.Close()
move := func(fromSrv string, toTags ...string) {
sEmpty := c.serverByName(fromSrv)
jszBefore, err := sEmpty.Jsz(nil)
require_NoError(t, err)
require_True(t, jszBefore.Streams == 1)
moveReq, err := json.Marshal(&JSApiMetaServerStreamMoveRequest{
Server: fromSrv, Tags: toTags})
require_NoError(t, err)
rmsg, err := ncsys.Request(fmt.Sprintf(JSApiServerStreamMoveT, "$G", "TEST"), moveReq, 100*time.Second)
require_NoError(t, err)
var moveResp JSApiStreamUpdateResponse
require_NoError(t, json.Unmarshal(rmsg.Data, &moveResp))
require_True(t, moveResp.Error == nil)
}
serverEmpty := func(fromSrv string) error {
if jszAfter, err := c.serverByName(fromSrv).Jsz(nil); err != nil {
return fmt.Errorf("could not fetch JS info for server: %v", err)
} else if jszAfter.Streams != 0 {
return fmt.Errorf("empty server still has %d streams", jszAfter.Streams)
} else if jszAfter.Consumers != 0 {
return fmt.Errorf("empty server still has %d consumers", jszAfter.Consumers)
} else if jszAfter.Store != 0 {
return fmt.Errorf("empty server still has %d storage", jszAfter.Store)
}
return nil
}
moveComplete := func(toSrv string, expectedSet ...string) error {
eSet := map[string]int{}
foundInExpected := false
for i, sExpected := range expectedSet {
eSet[sExpected] = i
s := c.serverByName(sExpected)
if !s.JetStreamIsStreamAssigned("$G", "TEST") {
return fmt.Errorf("expected stream to be assigned to %s", sExpected)
}
// test list order invariant
js, cc := s.getJetStreamCluster()
sExpHash := string(getHash(sExpected))
js.mu.Lock()
if sa, ok := cc.streams["$G"]["TEST"]; !ok {
js.mu.Unlock()
return fmt.Errorf("stream not found in cluster")
} else if len(sa.Group.Peers) != 3 {
js.mu.Unlock()
return fmt.Errorf("peers not reset")
} else if sa.Group.Peers[i] != sExpHash {
js.mu.Unlock()
return fmt.Errorf("stream: expected peer %s on index %d, got %s/%s",
sa.Group.Peers[i], i, sExpHash, sExpected)
} else if ca, ok := sa.consumers["DUR"]; !ok {
js.mu.Unlock()
return fmt.Errorf("durable not found in stream")
} else {
found := false
for _, peer := range ca.Group.Peers {
if peer == sExpHash {
found = true
break
}
}
if !found {
js.mu.Unlock()
return fmt.Errorf("consumer expected peer %s/%s bud didn't find in %+v",
sExpHash, sExpected, ca.Group.Peers)
}
if ephA, ok := sa.consumers[ephName]; ok {
if len(ephA.Group.Peers) != 1 {
return fmt.Errorf("ephemeral peers not reset")
}
foundInExpected = foundInExpected || (ephA.Group.Peers[0] == cc.meta.ID())
}
}
js.mu.Unlock()
}
if len(expectedSet) > 0 && !foundInExpected {
return fmt.Errorf("ephemeral peer not expected")
}
for _, s := range servers {
if jszAfter, err := c.serverByName(toSrv).Jsz(nil); err != nil {
return fmt.Errorf("could not fetch JS info for server: %v", err)
} else if jszAfter.Messages != toSend {
return fmt.Errorf("messages not yet copied, got %d, expected %d", jszAfter.Messages, toSend)
}
nc, js := jsClientConnect(t, s)
defer nc.Close()
if si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second)); err != nil {
return fmt.Errorf("could not fetch stream info: %v", err)
} else if len(si.Cluster.Replicas)+1 != si.Config.Replicas {
return fmt.Errorf("not yet downsized replica should be empty has: %d %s",
len(si.Cluster.Replicas), si.Cluster.Leader)
} else if si.Cluster.Leader == _EMPTY_ {
return fmt.Errorf("leader not found")
} else if len(expectedSet) > 0 {
if _, ok := eSet[si.Cluster.Leader]; !ok {
return fmt.Errorf("leader %s not in expected set %+v", si.Cluster.Leader, eSet)
} else if _, ok := eSet[si.Cluster.Replicas[0].Name]; !ok {
return fmt.Errorf("leader %s not in expected set %+v", si.Cluster.Replicas[0].Name, eSet)
} else if _, ok := eSet[si.Cluster.Replicas[1].Name]; !ok {
return fmt.Errorf("leader %s not in expected set %+v", si.Cluster.Replicas[1].Name, eSet)
}
}
nc.Close()
}
return nil
}
moveAndCheck := func(from, to string, expectedSet ...string) {
move(from, to)
checkFor(t, 40*time.Second, 100*time.Millisecond, func() error { return moveComplete(to, expectedSet...) })
checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { return serverEmpty(from) })
}
checkFor(t, 20*time.Second, 1000*time.Millisecond, func() error { return serverEmpty(srvMoveList[3]) })
// first iteration establishes order of server 0-2 (the internal order in the server could be 1,0,2)
moveAndCheck(srvMoveList[0], srvMoveList[3])
moveAndCheck(srvMoveList[1], srvMoveList[0])
moveAndCheck(srvMoveList[2], srvMoveList[1])
moveAndCheck(srvMoveList[3], srvMoveList[2], srvMoveList[0], srvMoveList[1], srvMoveList[2])
// second iteration iterates in order
moveAndCheck(srvMoveList[0], srvMoveList[3], srvMoveList[1], srvMoveList[2], srvMoveList[3])
moveAndCheck(srvMoveList[1], srvMoveList[0], srvMoveList[2], srvMoveList[3], srvMoveList[0])
moveAndCheck(srvMoveList[2], srvMoveList[1], srvMoveList[3], srvMoveList[0], srvMoveList[1])
moveAndCheck(srvMoveList[3], srvMoveList[2], srvMoveList[0], srvMoveList[1], srvMoveList[2])
// iterate in the opposite direction and establish order 2-0
moveAndCheck(srvMoveList[2], srvMoveList[3], srvMoveList[0], srvMoveList[1], srvMoveList[3])
moveAndCheck(srvMoveList[1], srvMoveList[2], srvMoveList[0], srvMoveList[3], srvMoveList[2])
moveAndCheck(srvMoveList[0], srvMoveList[1], srvMoveList[3], srvMoveList[2], srvMoveList[1])
moveAndCheck(srvMoveList[3], srvMoveList[0], srvMoveList[2], srvMoveList[1], srvMoveList[0])
// move server in the middle of list
moveAndCheck(srvMoveList[1], srvMoveList[3], srvMoveList[2], srvMoveList[0], srvMoveList[3])
moveAndCheck(srvMoveList[0], srvMoveList[1], srvMoveList[2], srvMoveList[3], srvMoveList[1])
moveAndCheck(srvMoveList[3], srvMoveList[0], srvMoveList[2], srvMoveList[1], srvMoveList[0])
// repeatedly use end
moveAndCheck(srvMoveList[0], srvMoveList[3], srvMoveList[2], srvMoveList[1], srvMoveList[3])
moveAndCheck(srvMoveList[3], srvMoveList[0], srvMoveList[2], srvMoveList[1], srvMoveList[0])
moveAndCheck(srvMoveList[0], srvMoveList[3], srvMoveList[2], srvMoveList[1], srvMoveList[3])
moveAndCheck(srvMoveList[3], srvMoveList[0], srvMoveList[2], srvMoveList[1], srvMoveList[0])
}
func TestJetStreamSuperClusterPeerEvacuationAndStreamReassignment(t *testing.T) {
s := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2,
func(serverName, clusterName, storeDir, conf string) string {
return fmt.Sprintf("%s\nserver_tags: [cluster:%s, server:%s]", conf, clusterName, serverName)
})
defer s.shutdown()
c := s.clusterForName("C1")
// Client based API
srv := c.randomNonLeader()
nc, js := jsClientConnect(t, srv)
defer nc.Close()
test := func(r int, moveTags []string, targetCluster string, testMigrateTo bool, listFrom bool) {
si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: r,
})
require_NoError(t, err)
defer js.DeleteStream("TEST")
startSet := map[string]struct{}{
si.Cluster.Leader: {},
}
for _, p := range si.Cluster.Replicas {
startSet[p.Name] = struct{}{}
}
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "DUR", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)
sub, err := js.SubscribeSync("foo")
require_NoError(t, err)
for i := 0; i < 100; i++ {
_, err = js.Publish("foo", nil)
require_NoError(t, err)
}
toMoveFrom := si.Cluster.Leader
if !listFrom {
toMoveFrom = _EMPTY_
}
sLdr := c.serverByName(si.Cluster.Leader)
jszBefore, err := sLdr.Jsz(nil)
require_NoError(t, err)
require_True(t, jszBefore.Streams == 1)
require_True(t, jszBefore.Consumers >= 1)
require_True(t, jszBefore.Store != 0)
migrateToServer := _EMPTY_
if testMigrateTo {
// find an empty server
for _, s := range c.servers {
name := s.Name()
found := si.Cluster.Leader == name
if !found {
for _, r := range si.Cluster.Replicas {
if r.Name == name {
found = true
break
}
}
}
if !found {
migrateToServer = name
break
}
}
jszAfter, err := c.serverByName(migrateToServer).Jsz(nil)
require_NoError(t, err)
require_True(t, jszAfter.Streams == 0)
moveTags = append(moveTags, fmt.Sprintf("server:%s", migrateToServer))
}
ncsys, err := nats.Connect(srv.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
require_NoError(t, err)
defer ncsys.Close()
moveReq, err := json.Marshal(&JSApiMetaServerStreamMoveRequest{
Server: toMoveFrom, Tags: moveTags})
require_NoError(t, err)
rmsg, err := ncsys.Request(fmt.Sprintf(JSApiServerStreamMoveT, "$G", "TEST"), moveReq, 100*time.Second)
require_NoError(t, err)
var moveResp JSApiStreamUpdateResponse
require_NoError(t, json.Unmarshal(rmsg.Data, &moveResp))
require_True(t, moveResp.Error == nil)
// test move to particular server
if testMigrateTo {
toSrv := c.serverByName(migrateToServer)
checkFor(t, 20*time.Second, 1000*time.Millisecond, func() error {
jszAfter, err := toSrv.Jsz(nil)
if err != nil {
return fmt.Errorf("could not fetch JS info for server: %v", err)
}
if jszAfter.Streams != 1 {
return fmt.Errorf("server expected to have one stream, has %d", jszAfter.Streams)
}
return nil
})
}
// Now wait until the stream is now current.
checkFor(t, 50*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second))
if err != nil {
return fmt.Errorf("could not fetch stream info: %v", err)
}
if si.Cluster.Leader == toMoveFrom {
return fmt.Errorf("peer not removed yet: %+v", toMoveFrom)
}
if si.Cluster.Leader == _EMPTY_ {
return fmt.Errorf("no leader yet")
}
if len(si.Cluster.Replicas) != r-1 {
return fmt.Errorf("not yet downsized replica should be %d has: %d", r-1, len(si.Cluster.Replicas))
}
if si.Config.Replicas != r {
return fmt.Errorf("bad replica count %d", si.Config.Replicas)
}
if si.Cluster.Name != targetCluster {
return fmt.Errorf("stream expected in %s but found in %s", si.Cluster.Name, targetCluster)
}
sNew := s.serverByName(si.Cluster.Leader)
if jszNew, err := sNew.Jsz(nil); err != nil {
return err
} else if jszNew.Streams != 1 {
return fmt.Errorf("new leader has %d streams, not one", jszNew.Streams)
} else if jszNew.Store != jszBefore.Store {
return fmt.Errorf("new leader has %d storage, should have %d", jszNew.Store, jszBefore.Store)
}
return nil
})
// test draining
checkFor(t, 20*time.Second, 1000*time.Millisecond, func() error {
if !listFrom {
// when needed determine which server move moved away from
si, err := js.StreamInfo("TEST", nats.MaxWait(2*time.Second))
require_NoError(t, err)
for n := range startSet {
if n != si.Cluster.Leader {
found := false
for _, p := range si.Cluster.Replicas {
if p.Name == n {
found = true
break
}
}
if !found {
toMoveFrom = n
}
}
}
}
if toMoveFrom == _EMPTY_ {
return fmt.Errorf("server to move away from not found")
}
sEmpty := c.serverByName(toMoveFrom)
jszAfter, err := sEmpty.Jsz(nil)
if err != nil {
return fmt.Errorf("could not fetch JS info for server: %v", err)
}
if jszAfter.Streams != 0 {
return fmt.Errorf("empty server still has %d streams", jszAfter.Streams)
}
if jszAfter.Consumers != 0 {
return fmt.Errorf("empty server still has %d consumers", jszAfter.Consumers)
}
if jszAfter.Store != 0 {
return fmt.Errorf("empty server still has %d storage", jszAfter.Store)
}
return nil
})
// consume messages from ephemeral consumer
for i := 0; i < 100; i++ {
_, err := sub.NextMsg(time.Second)
require_NoError(t, err)
}
}
for i := 1; i <= 3; i++ {
t.Run(fmt.Sprintf("r%d", i), func(t *testing.T) {
test(i, nil, "C1", false, true)
})
t.Run(fmt.Sprintf("r%d-explicit", i), func(t *testing.T) {
test(i, nil, "C1", true, true)
})
t.Run(fmt.Sprintf("r%d-nosrc", i), func(t *testing.T) {
test(i, nil, "C1", false, false)
})
}
t.Run("r3-cluster-move", func(t *testing.T) {
test(3, []string{"cluster:C2"}, "C2", false, false)
})
t.Run("r3-cluster-move-nosrc", func(t *testing.T) {
test(3, []string{"cluster:C2"}, "C2", false, true)
})
}