From 38727417df998935210b492d0b353eca05498baa Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 27 Jul 2022 17:14:19 -0600 Subject: [PATCH] Moving super-cluster tests from cluster tests file to supercluster file Signed-off-by: Ivan Kozlovic --- server/jetstream_cluster_test.go | 539 ------------------------- server/jetstream_super_cluster_test.go | 539 +++++++++++++++++++++++++ 2 files changed, 539 insertions(+), 539 deletions(-) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 8ca2b8ca..9b70c741 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -3762,343 +3762,6 @@ func TestJetStreamClusterScaleConsumer(t *testing.T) { } } -func TestJetStreamClusterMoveCancel(t *testing.T) { - server := map[string]struct{}{} - sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2, - func(serverName, clusterName, storeDir, conf string) string { - server[serverName] = struct{}{} - return fmt.Sprintf("%s\nserver_tags: [%s]", conf, serverName) - }) - defer sc.shutdown() - - // Client based API - c := sc.randomCluster() - srv := c.randomNonLeader() - nc, js := jsClientConnect(t, srv) - defer nc.Close() - - siCreate, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - Replicas: 3, - }) - require_NoError(t, err) - streamPeerSrv := []string{siCreate.Cluster.Leader, siCreate.Cluster.Replicas[0].Name, siCreate.Cluster.Replicas[1].Name} - // determine empty server - for _, s := range streamPeerSrv { - delete(server, s) - } - // pick left over server in same cluster as other server - emptySrv := _EMPTY_ - for s := range server { - // server name is prefixed with cluster name - if strings.HasPrefix(s, c.name) { - emptySrv = s - break - } - } - - expectedPeers := map[string]struct{}{ - string(getHash(streamPeerSrv[0])): {}, - string(getHash(streamPeerSrv[1])): {}, - string(getHash(streamPeerSrv[2])): {}, - } - - _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "DUR", AckPolicy: nats.AckExplicitPolicy}) - require_NoError(t, err) - ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{InactiveThreshold: time.Hour, AckPolicy: nats.AckExplicitPolicy}) - require_NoError(t, err) - ephName := ci.Name - - toSend := uint64(1_000) - for i := uint64(0); i < toSend; i++ { - _, err = js.Publish("foo", nil) - require_NoError(t, err) - } - - serverEmpty := func(fromSrv string) error { - if jszAfter, err := c.serverByName(fromSrv).Jsz(nil); err != nil { - return fmt.Errorf("could not fetch JS info for server: %v", err) - } else if jszAfter.Streams != 0 { - return fmt.Errorf("empty server still has %d streams", jszAfter.Streams) - } else if jszAfter.Consumers != 0 { - return fmt.Errorf("empty server still has %d consumers", jszAfter.Consumers) - } else if jszAfter.Bytes != 0 { - return fmt.Errorf("empty server still has %d storage", jszAfter.Store) - } - return nil - } - - checkSrvInvariant := func(s *Server, expectedPeers map[string]struct{}) error { - js, cc := s.getJetStreamCluster() - js.mu.Lock() - defer js.mu.Unlock() - if sa, ok := cc.streams["$G"]["TEST"]; !ok { - return fmt.Errorf("stream not found") - } else if len(sa.Group.Peers) != len(expectedPeers) { - return fmt.Errorf("stream peer group size not %d, but %d", len(expectedPeers), len(sa.Group.Peers)) - } else if da, ok := sa.consumers["DUR"]; !ok { - return fmt.Errorf("durable not found") - } else if len(da.Group.Peers) != len(expectedPeers) { - return fmt.Errorf("durable peer group size not %d, but %d", len(expectedPeers), len(da.Group.Peers)) - } else if ea, ok := sa.consumers[ephName]; !ok { - return fmt.Errorf("ephemeral not found") - } else if len(ea.Group.Peers) != 1 { - return fmt.Errorf("ephemeral peer group size not 1, but %d", len(ea.Group.Peers)) - } else if _, ok := expectedPeers[ea.Group.Peers[0]]; !ok { - return fmt.Errorf("ephemeral peer not an expected peer") - } else { - for _, p := range sa.Group.Peers { - if _, ok := expectedPeers[p]; !ok { - return fmt.Errorf("peer not expected") - } - found := false - for _, dp := range da.Group.Peers { - if p == dp { - found = true - break - } - } - if !found { - fmt.Printf("durable peer group does not match stream peer group") - } - } - } - return nil - } - - ncsys, err := nats.Connect(srv.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) - require_NoError(t, err) - defer ncsys.Close() - - for _, moveFromSrv := range streamPeerSrv { - moveReq, err := json.Marshal(&JSApiMetaServerStreamMoveRequest{Server: moveFromSrv, Tags: []string{emptySrv}}) - require_NoError(t, err) - rmsg, err := ncsys.Request(fmt.Sprintf(JSApiServerStreamMoveT, "$G", "TEST"), moveReq, 5*time.Second) - require_NoError(t, err) - var moveResp JSApiStreamUpdateResponse - require_NoError(t, json.Unmarshal(rmsg.Data, &moveResp)) - require_True(t, moveResp.Error == nil) - - rmsg, err = ncsys.Request(fmt.Sprintf(JSApiServerStreamCancelMoveT, "$G", "TEST"), nil, 5*time.Second) - require_NoError(t, err) - var cancelResp JSApiStreamUpdateResponse - require_NoError(t, json.Unmarshal(rmsg.Data, &cancelResp)) - if cancelResp.Error != nil && ErrorIdentifier(cancelResp.Error.ErrCode) == JSStreamMoveNotInProgress { - t.Skip("This can happen with delays, when Move completed before Cancel", cancelResp.Error) - return - } - require_True(t, cancelResp.Error == nil) - - for _, sExpected := range streamPeerSrv { - s := c.serverByName(sExpected) - require_True(t, s.JetStreamIsStreamAssigned("$G", "TEST")) - checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { return checkSrvInvariant(s, expectedPeers) }) - } - checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { return serverEmpty(emptySrv) }) - } -} - -func TestJetStreamClusterDoubleStreamMove(t *testing.T) { - server := map[string]struct{}{} - sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2, - func(serverName, clusterName, storeDir, conf string) string { - server[serverName] = struct{}{} - return fmt.Sprintf("%s\nserver_tags: [%s]", conf, serverName) - }) - defer sc.shutdown() - - // Client based API - c := sc.randomCluster() - srv := c.randomNonLeader() - nc, js := jsClientConnect(t, srv) - defer nc.Close() - - siCreate, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - Replicas: 3, - }) - require_NoError(t, err) - srvMoveList := []string{siCreate.Cluster.Leader, siCreate.Cluster.Replicas[0].Name, siCreate.Cluster.Replicas[1].Name} - // determine empty server - for _, s := range srvMoveList { - delete(server, s) - } - // pick left over server in same cluster as other server - for s := range server { - // server name is prefixed with cluster name - if strings.HasPrefix(s, c.name) { - srvMoveList = append(srvMoveList, s) - break - } - } - - servers := []*Server{ - c.serverByName(srvMoveList[0]), - c.serverByName(srvMoveList[1]), - c.serverByName(srvMoveList[2]), - c.serverByName(srvMoveList[3]), // starts out empty - } - - _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "DUR", AckPolicy: nats.AckExplicitPolicy}) - require_NoError(t, err) - ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{InactiveThreshold: time.Hour, AckPolicy: nats.AckExplicitPolicy}) - require_NoError(t, err) - ephName := ci.Name - - toSend := uint64(100) - for i := uint64(0); i < toSend; i++ { - _, err = js.Publish("foo", nil) - require_NoError(t, err) - } - - ncsys, err := nats.Connect(srv.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) - require_NoError(t, err) - defer ncsys.Close() - - move := func(fromSrv string, toTags ...string) { - sEmpty := c.serverByName(fromSrv) - jszBefore, err := sEmpty.Jsz(nil) - require_NoError(t, err) - require_True(t, jszBefore.Streams == 1) - - moveReq, err := json.Marshal(&JSApiMetaServerStreamMoveRequest{ - Server: fromSrv, Tags: toTags}) - require_NoError(t, err) - rmsg, err := ncsys.Request(fmt.Sprintf(JSApiServerStreamMoveT, "$G", "TEST"), moveReq, 100*time.Second) - require_NoError(t, err) - var moveResp JSApiStreamUpdateResponse - require_NoError(t, json.Unmarshal(rmsg.Data, &moveResp)) - require_True(t, moveResp.Error == nil) - } - - serverEmpty := func(fromSrv string) error { - if jszAfter, err := c.serverByName(fromSrv).Jsz(nil); err != nil { - return fmt.Errorf("could not fetch JS info for server: %v", err) - } else if jszAfter.Streams != 0 { - return fmt.Errorf("empty server still has %d streams", jszAfter.Streams) - } else if jszAfter.Consumers != 0 { - return fmt.Errorf("empty server still has %d consumers", jszAfter.Consumers) - } else if jszAfter.Store != 0 { - return fmt.Errorf("empty server still has %d storage", jszAfter.Store) - } - return nil - } - - moveComplete := func(toSrv string, expectedSet ...string) error { - eSet := map[string]int{} - foundInExpected := false - for i, sExpected := range expectedSet { - eSet[sExpected] = i - s := c.serverByName(sExpected) - if !s.JetStreamIsStreamAssigned("$G", "TEST") { - return fmt.Errorf("expected stream to be assigned to %s", sExpected) - } - // test list order invariant - js, cc := s.getJetStreamCluster() - sExpHash := string(getHash(sExpected)) - js.mu.Lock() - if sa, ok := cc.streams["$G"]["TEST"]; !ok { - js.mu.Unlock() - return fmt.Errorf("stream not found in cluster") - } else if len(sa.Group.Peers) != 3 { - js.mu.Unlock() - return fmt.Errorf("peers not reset") - } else if sa.Group.Peers[i] != sExpHash { - js.mu.Unlock() - return fmt.Errorf("stream: expected peer %s on index %d, got %s/%s", - sa.Group.Peers[i], i, sExpHash, sExpected) - } else if ca, ok := sa.consumers["DUR"]; !ok { - js.mu.Unlock() - return fmt.Errorf("durable not found in stream") - } else { - found := false - for _, peer := range ca.Group.Peers { - if peer == sExpHash { - found = true - break - } - } - if !found { - js.mu.Unlock() - return fmt.Errorf("consumer expected peer %s/%s bud didn't find in %+v", - sExpHash, sExpected, ca.Group.Peers) - } - if ephA, ok := sa.consumers[ephName]; ok { - if len(ephA.Group.Peers) != 1 { - return fmt.Errorf("ephemeral peers not reset") - } - foundInExpected = foundInExpected || (ephA.Group.Peers[0] == cc.meta.ID()) - } - } - js.mu.Unlock() - } - if len(expectedSet) > 0 && !foundInExpected { - return fmt.Errorf("ephemeral peer not expected") - } - for _, s := range servers { - if jszAfter, err := c.serverByName(toSrv).Jsz(nil); err != nil { - return fmt.Errorf("could not fetch JS info for server: %v", err) - } else if jszAfter.Messages != toSend { - return fmt.Errorf("messages not yet copied, got %d, expected %d", jszAfter.Messages, toSend) - } - nc, js := jsClientConnect(t, s) - defer nc.Close() - if si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second)); err != nil { - return fmt.Errorf("could not fetch stream info: %v", err) - } else if len(si.Cluster.Replicas)+1 != si.Config.Replicas { - return fmt.Errorf("not yet downsized replica should be empty has: %d %s", - len(si.Cluster.Replicas), si.Cluster.Leader) - } else if si.Cluster.Leader == _EMPTY_ { - return fmt.Errorf("leader not found") - } else if len(expectedSet) > 0 { - if _, ok := eSet[si.Cluster.Leader]; !ok { - return fmt.Errorf("leader %s not in expected set %+v", si.Cluster.Leader, eSet) - } else if _, ok := eSet[si.Cluster.Replicas[0].Name]; !ok { - return fmt.Errorf("leader %s not in expected set %+v", si.Cluster.Replicas[0].Name, eSet) - } else if _, ok := eSet[si.Cluster.Replicas[1].Name]; !ok { - return fmt.Errorf("leader %s not in expected set %+v", si.Cluster.Replicas[1].Name, eSet) - } - } - nc.Close() - } - return nil - } - - moveAndCheck := func(from, to string, expectedSet ...string) { - move(from, to) - checkFor(t, 40*time.Second, 100*time.Millisecond, func() error { return moveComplete(to, expectedSet...) }) - checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { return serverEmpty(from) }) - } - - checkFor(t, 20*time.Second, 1000*time.Millisecond, func() error { return serverEmpty(srvMoveList[3]) }) - // first iteration establishes order of server 0-2 (the internal order in the server could be 1,0,2) - moveAndCheck(srvMoveList[0], srvMoveList[3]) - moveAndCheck(srvMoveList[1], srvMoveList[0]) - moveAndCheck(srvMoveList[2], srvMoveList[1]) - moveAndCheck(srvMoveList[3], srvMoveList[2], srvMoveList[0], srvMoveList[1], srvMoveList[2]) - // second iteration iterates in order - moveAndCheck(srvMoveList[0], srvMoveList[3], srvMoveList[1], srvMoveList[2], srvMoveList[3]) - moveAndCheck(srvMoveList[1], srvMoveList[0], srvMoveList[2], srvMoveList[3], srvMoveList[0]) - moveAndCheck(srvMoveList[2], srvMoveList[1], srvMoveList[3], srvMoveList[0], srvMoveList[1]) - moveAndCheck(srvMoveList[3], srvMoveList[2], srvMoveList[0], srvMoveList[1], srvMoveList[2]) - // iterate in the opposite direction and establish order 2-0 - moveAndCheck(srvMoveList[2], srvMoveList[3], srvMoveList[0], srvMoveList[1], srvMoveList[3]) - moveAndCheck(srvMoveList[1], srvMoveList[2], srvMoveList[0], srvMoveList[3], srvMoveList[2]) - moveAndCheck(srvMoveList[0], srvMoveList[1], srvMoveList[3], srvMoveList[2], srvMoveList[1]) - moveAndCheck(srvMoveList[3], srvMoveList[0], srvMoveList[2], srvMoveList[1], srvMoveList[0]) - // move server in the middle of list - moveAndCheck(srvMoveList[1], srvMoveList[3], srvMoveList[2], srvMoveList[0], srvMoveList[3]) - moveAndCheck(srvMoveList[0], srvMoveList[1], srvMoveList[2], srvMoveList[3], srvMoveList[1]) - moveAndCheck(srvMoveList[3], srvMoveList[0], srvMoveList[2], srvMoveList[1], srvMoveList[0]) - // repeatedly use end - moveAndCheck(srvMoveList[0], srvMoveList[3], srvMoveList[2], srvMoveList[1], srvMoveList[3]) - moveAndCheck(srvMoveList[3], srvMoveList[0], srvMoveList[2], srvMoveList[1], srvMoveList[0]) - moveAndCheck(srvMoveList[0], srvMoveList[3], srvMoveList[2], srvMoveList[1], srvMoveList[3]) - moveAndCheck(srvMoveList[3], srvMoveList[0], srvMoveList[2], srvMoveList[1], srvMoveList[0]) -} - func TestJetStreamClusterConsumerScaleUp(t *testing.T) { c := createJetStreamCluster(t, jsClusterTempl, "HUB", _EMPTY_, 3, 22020, true) defer c.shutdown() @@ -4144,208 +3807,6 @@ func TestJetStreamClusterConsumerScaleUp(t *testing.T) { c.waitOnConsumerLeader("$G", "TEST", "DUR") } -func TestJetStreamClusterPeerEvacuationAndStreamReassignment(t *testing.T) { - s := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2, - func(serverName, clusterName, storeDir, conf string) string { - return fmt.Sprintf("%s\nserver_tags: [cluster:%s, server:%s]", conf, clusterName, serverName) - }) - defer s.shutdown() - - c := s.clusterForName("C1") - - // Client based API - srv := c.randomNonLeader() - nc, js := jsClientConnect(t, srv) - defer nc.Close() - - test := func(r int, moveTags []string, targetCluster string, testMigrateTo bool, listFrom bool) { - si, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - Replicas: r, - }) - require_NoError(t, err) - defer js.DeleteStream("TEST") - startSet := map[string]struct{}{ - si.Cluster.Leader: {}, - } - for _, p := range si.Cluster.Replicas { - startSet[p.Name] = struct{}{} - } - - _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "DUR", AckPolicy: nats.AckExplicitPolicy}) - require_NoError(t, err) - - sub, err := js.SubscribeSync("foo") - require_NoError(t, err) - - for i := 0; i < 100; i++ { - _, err = js.Publish("foo", nil) - require_NoError(t, err) - } - - toMoveFrom := si.Cluster.Leader - if !listFrom { - toMoveFrom = _EMPTY_ - } - sLdr := c.serverByName(si.Cluster.Leader) - jszBefore, err := sLdr.Jsz(nil) - require_NoError(t, err) - require_True(t, jszBefore.Streams == 1) - require_True(t, jszBefore.Consumers >= 1) - require_True(t, jszBefore.Store != 0) - - migrateToServer := _EMPTY_ - if testMigrateTo { - // find an empty server - for _, s := range c.servers { - name := s.Name() - found := si.Cluster.Leader == name - if !found { - for _, r := range si.Cluster.Replicas { - if r.Name == name { - found = true - break - } - } - } - if !found { - migrateToServer = name - break - } - } - jszAfter, err := c.serverByName(migrateToServer).Jsz(nil) - require_NoError(t, err) - require_True(t, jszAfter.Streams == 0) - - moveTags = append(moveTags, fmt.Sprintf("server:%s", migrateToServer)) - } - - ncsys, err := nats.Connect(srv.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) - require_NoError(t, err) - defer ncsys.Close() - - moveReq, err := json.Marshal(&JSApiMetaServerStreamMoveRequest{ - Server: toMoveFrom, Tags: moveTags}) - require_NoError(t, err) - rmsg, err := ncsys.Request(fmt.Sprintf(JSApiServerStreamMoveT, "$G", "TEST"), moveReq, 100*time.Second) - require_NoError(t, err) - var moveResp JSApiStreamUpdateResponse - require_NoError(t, json.Unmarshal(rmsg.Data, &moveResp)) - require_True(t, moveResp.Error == nil) - - // test move to particular server - if testMigrateTo { - toSrv := c.serverByName(migrateToServer) - checkFor(t, 20*time.Second, 1000*time.Millisecond, func() error { - jszAfter, err := toSrv.Jsz(nil) - if err != nil { - return fmt.Errorf("could not fetch JS info for server: %v", err) - } - if jszAfter.Streams != 1 { - return fmt.Errorf("server expected to have one stream, has %d", jszAfter.Streams) - } - return nil - }) - } - // Now wait until the stream is now current. - checkFor(t, 50*time.Second, 100*time.Millisecond, func() error { - si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second)) - if err != nil { - return fmt.Errorf("could not fetch stream info: %v", err) - } - if si.Cluster.Leader == toMoveFrom { - return fmt.Errorf("peer not removed yet: %+v", toMoveFrom) - } - if si.Cluster.Leader == _EMPTY_ { - return fmt.Errorf("no leader yet") - } - if len(si.Cluster.Replicas) != r-1 { - return fmt.Errorf("not yet downsized replica should be %d has: %d", r-1, len(si.Cluster.Replicas)) - } - if si.Config.Replicas != r { - return fmt.Errorf("bad replica count %d", si.Config.Replicas) - } - if si.Cluster.Name != targetCluster { - return fmt.Errorf("stream expected in %s but found in %s", si.Cluster.Name, targetCluster) - } - sNew := s.serverByName(si.Cluster.Leader) - if jszNew, err := sNew.Jsz(nil); err != nil { - return err - } else if jszNew.Streams != 1 { - return fmt.Errorf("new leader has %d streams, not one", jszNew.Streams) - } else if jszNew.Store != jszBefore.Store { - return fmt.Errorf("new leader has %d storage, should have %d", jszNew.Store, jszBefore.Store) - } - return nil - }) - // test draining - checkFor(t, 20*time.Second, 1000*time.Millisecond, func() error { - if !listFrom { - // when needed determine which server move moved away from - si, err := js.StreamInfo("TEST", nats.MaxWait(2*time.Second)) - require_NoError(t, err) - for n := range startSet { - if n != si.Cluster.Leader { - found := false - for _, p := range si.Cluster.Replicas { - if p.Name == n { - found = true - break - } - } - if !found { - toMoveFrom = n - } - } - } - } - if toMoveFrom == _EMPTY_ { - return fmt.Errorf("server to move away from not found") - } - sEmpty := c.serverByName(toMoveFrom) - jszAfter, err := sEmpty.Jsz(nil) - if err != nil { - return fmt.Errorf("could not fetch JS info for server: %v", err) - } - if jszAfter.Streams != 0 { - return fmt.Errorf("empty server still has %d streams", jszAfter.Streams) - } - if jszAfter.Consumers != 0 { - return fmt.Errorf("empty server still has %d consumers", jszAfter.Consumers) - } - if jszAfter.Store != 0 { - return fmt.Errorf("empty server still has %d storage", jszAfter.Store) - } - return nil - }) - // consume messages from ephemeral consumer - for i := 0; i < 100; i++ { - _, err := sub.NextMsg(time.Second) - require_NoError(t, err) - } - } - - for i := 1; i <= 3; i++ { - t.Run(fmt.Sprintf("r%d", i), func(t *testing.T) { - test(i, nil, "C1", false, true) - }) - t.Run(fmt.Sprintf("r%d-explicit", i), func(t *testing.T) { - test(i, nil, "C1", true, true) - }) - t.Run(fmt.Sprintf("r%d-nosrc", i), func(t *testing.T) { - test(i, nil, "C1", false, false) - }) - } - - t.Run("r3-cluster-move", func(t *testing.T) { - test(3, []string{"cluster:C2"}, "C2", false, false) - }) - t.Run("r3-cluster-move-nosrc", func(t *testing.T) { - test(3, []string{"cluster:C2"}, "C2", false, true) - }) -} - func TestJetStreamClusterPeerOffline(t *testing.T) { c := createJetStreamClusterExplicit(t, "R5S", 5) defer c.shutdown() diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 4b2abe7f..3d8086f6 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -2648,3 +2648,542 @@ func TestJetStreamSuperClusterStreamDirectGetMirrorQueueGroup(t *testing.T) { m = getMsg(nc) require_True(t, m.Header.Get(JSStream) == "M2") } + +func TestJetStreamSuperClusterMoveCancel(t *testing.T) { + server := map[string]struct{}{} + sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2, + func(serverName, clusterName, storeDir, conf string) string { + server[serverName] = struct{}{} + return fmt.Sprintf("%s\nserver_tags: [%s]", conf, serverName) + }) + defer sc.shutdown() + + // Client based API + c := sc.randomCluster() + srv := c.randomNonLeader() + nc, js := jsClientConnect(t, srv) + defer nc.Close() + + siCreate, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + streamPeerSrv := []string{siCreate.Cluster.Leader, siCreate.Cluster.Replicas[0].Name, siCreate.Cluster.Replicas[1].Name} + // determine empty server + for _, s := range streamPeerSrv { + delete(server, s) + } + // pick left over server in same cluster as other server + emptySrv := _EMPTY_ + for s := range server { + // server name is prefixed with cluster name + if strings.HasPrefix(s, c.name) { + emptySrv = s + break + } + } + + expectedPeers := map[string]struct{}{ + string(getHash(streamPeerSrv[0])): {}, + string(getHash(streamPeerSrv[1])): {}, + string(getHash(streamPeerSrv[2])): {}, + } + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "DUR", AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{InactiveThreshold: time.Hour, AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + ephName := ci.Name + + toSend := uint64(1_000) + for i := uint64(0); i < toSend; i++ { + _, err = js.Publish("foo", nil) + require_NoError(t, err) + } + + serverEmpty := func(fromSrv string) error { + if jszAfter, err := c.serverByName(fromSrv).Jsz(nil); err != nil { + return fmt.Errorf("could not fetch JS info for server: %v", err) + } else if jszAfter.Streams != 0 { + return fmt.Errorf("empty server still has %d streams", jszAfter.Streams) + } else if jszAfter.Consumers != 0 { + return fmt.Errorf("empty server still has %d consumers", jszAfter.Consumers) + } else if jszAfter.Bytes != 0 { + return fmt.Errorf("empty server still has %d storage", jszAfter.Store) + } + return nil + } + + checkSrvInvariant := func(s *Server, expectedPeers map[string]struct{}) error { + js, cc := s.getJetStreamCluster() + js.mu.Lock() + defer js.mu.Unlock() + if sa, ok := cc.streams["$G"]["TEST"]; !ok { + return fmt.Errorf("stream not found") + } else if len(sa.Group.Peers) != len(expectedPeers) { + return fmt.Errorf("stream peer group size not %d, but %d", len(expectedPeers), len(sa.Group.Peers)) + } else if da, ok := sa.consumers["DUR"]; !ok { + return fmt.Errorf("durable not found") + } else if len(da.Group.Peers) != len(expectedPeers) { + return fmt.Errorf("durable peer group size not %d, but %d", len(expectedPeers), len(da.Group.Peers)) + } else if ea, ok := sa.consumers[ephName]; !ok { + return fmt.Errorf("ephemeral not found") + } else if len(ea.Group.Peers) != 1 { + return fmt.Errorf("ephemeral peer group size not 1, but %d", len(ea.Group.Peers)) + } else if _, ok := expectedPeers[ea.Group.Peers[0]]; !ok { + return fmt.Errorf("ephemeral peer not an expected peer") + } else { + for _, p := range sa.Group.Peers { + if _, ok := expectedPeers[p]; !ok { + return fmt.Errorf("peer not expected") + } + found := false + for _, dp := range da.Group.Peers { + if p == dp { + found = true + break + } + } + if !found { + fmt.Printf("durable peer group does not match stream peer group") + } + } + } + return nil + } + + ncsys, err := nats.Connect(srv.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + require_NoError(t, err) + defer ncsys.Close() + + for _, moveFromSrv := range streamPeerSrv { + moveReq, err := json.Marshal(&JSApiMetaServerStreamMoveRequest{Server: moveFromSrv, Tags: []string{emptySrv}}) + require_NoError(t, err) + rmsg, err := ncsys.Request(fmt.Sprintf(JSApiServerStreamMoveT, "$G", "TEST"), moveReq, 5*time.Second) + require_NoError(t, err) + var moveResp JSApiStreamUpdateResponse + require_NoError(t, json.Unmarshal(rmsg.Data, &moveResp)) + require_True(t, moveResp.Error == nil) + + rmsg, err = ncsys.Request(fmt.Sprintf(JSApiServerStreamCancelMoveT, "$G", "TEST"), nil, 5*time.Second) + require_NoError(t, err) + var cancelResp JSApiStreamUpdateResponse + require_NoError(t, json.Unmarshal(rmsg.Data, &cancelResp)) + if cancelResp.Error != nil && ErrorIdentifier(cancelResp.Error.ErrCode) == JSStreamMoveNotInProgress { + t.Skip("This can happen with delays, when Move completed before Cancel", cancelResp.Error) + return + } + require_True(t, cancelResp.Error == nil) + + for _, sExpected := range streamPeerSrv { + s := c.serverByName(sExpected) + require_True(t, s.JetStreamIsStreamAssigned("$G", "TEST")) + checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { return checkSrvInvariant(s, expectedPeers) }) + } + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { return serverEmpty(emptySrv) }) + } +} + +func TestJetStreamSuperClusterDoubleStreamMove(t *testing.T) { + server := map[string]struct{}{} + sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2, + func(serverName, clusterName, storeDir, conf string) string { + server[serverName] = struct{}{} + return fmt.Sprintf("%s\nserver_tags: [%s]", conf, serverName) + }) + defer sc.shutdown() + + // Client based API + c := sc.randomCluster() + srv := c.randomNonLeader() + nc, js := jsClientConnect(t, srv) + defer nc.Close() + + siCreate, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + srvMoveList := []string{siCreate.Cluster.Leader, siCreate.Cluster.Replicas[0].Name, siCreate.Cluster.Replicas[1].Name} + // determine empty server + for _, s := range srvMoveList { + delete(server, s) + } + // pick left over server in same cluster as other server + for s := range server { + // server name is prefixed with cluster name + if strings.HasPrefix(s, c.name) { + srvMoveList = append(srvMoveList, s) + break + } + } + + servers := []*Server{ + c.serverByName(srvMoveList[0]), + c.serverByName(srvMoveList[1]), + c.serverByName(srvMoveList[2]), + c.serverByName(srvMoveList[3]), // starts out empty + } + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "DUR", AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{InactiveThreshold: time.Hour, AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + ephName := ci.Name + + toSend := uint64(100) + for i := uint64(0); i < toSend; i++ { + _, err = js.Publish("foo", nil) + require_NoError(t, err) + } + + ncsys, err := nats.Connect(srv.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + require_NoError(t, err) + defer ncsys.Close() + + move := func(fromSrv string, toTags ...string) { + sEmpty := c.serverByName(fromSrv) + jszBefore, err := sEmpty.Jsz(nil) + require_NoError(t, err) + require_True(t, jszBefore.Streams == 1) + + moveReq, err := json.Marshal(&JSApiMetaServerStreamMoveRequest{ + Server: fromSrv, Tags: toTags}) + require_NoError(t, err) + rmsg, err := ncsys.Request(fmt.Sprintf(JSApiServerStreamMoveT, "$G", "TEST"), moveReq, 100*time.Second) + require_NoError(t, err) + var moveResp JSApiStreamUpdateResponse + require_NoError(t, json.Unmarshal(rmsg.Data, &moveResp)) + require_True(t, moveResp.Error == nil) + } + + serverEmpty := func(fromSrv string) error { + if jszAfter, err := c.serverByName(fromSrv).Jsz(nil); err != nil { + return fmt.Errorf("could not fetch JS info for server: %v", err) + } else if jszAfter.Streams != 0 { + return fmt.Errorf("empty server still has %d streams", jszAfter.Streams) + } else if jszAfter.Consumers != 0 { + return fmt.Errorf("empty server still has %d consumers", jszAfter.Consumers) + } else if jszAfter.Store != 0 { + return fmt.Errorf("empty server still has %d storage", jszAfter.Store) + } + return nil + } + + moveComplete := func(toSrv string, expectedSet ...string) error { + eSet := map[string]int{} + foundInExpected := false + for i, sExpected := range expectedSet { + eSet[sExpected] = i + s := c.serverByName(sExpected) + if !s.JetStreamIsStreamAssigned("$G", "TEST") { + return fmt.Errorf("expected stream to be assigned to %s", sExpected) + } + // test list order invariant + js, cc := s.getJetStreamCluster() + sExpHash := string(getHash(sExpected)) + js.mu.Lock() + if sa, ok := cc.streams["$G"]["TEST"]; !ok { + js.mu.Unlock() + return fmt.Errorf("stream not found in cluster") + } else if len(sa.Group.Peers) != 3 { + js.mu.Unlock() + return fmt.Errorf("peers not reset") + } else if sa.Group.Peers[i] != sExpHash { + js.mu.Unlock() + return fmt.Errorf("stream: expected peer %s on index %d, got %s/%s", + sa.Group.Peers[i], i, sExpHash, sExpected) + } else if ca, ok := sa.consumers["DUR"]; !ok { + js.mu.Unlock() + return fmt.Errorf("durable not found in stream") + } else { + found := false + for _, peer := range ca.Group.Peers { + if peer == sExpHash { + found = true + break + } + } + if !found { + js.mu.Unlock() + return fmt.Errorf("consumer expected peer %s/%s bud didn't find in %+v", + sExpHash, sExpected, ca.Group.Peers) + } + if ephA, ok := sa.consumers[ephName]; ok { + if len(ephA.Group.Peers) != 1 { + return fmt.Errorf("ephemeral peers not reset") + } + foundInExpected = foundInExpected || (ephA.Group.Peers[0] == cc.meta.ID()) + } + } + js.mu.Unlock() + } + if len(expectedSet) > 0 && !foundInExpected { + return fmt.Errorf("ephemeral peer not expected") + } + for _, s := range servers { + if jszAfter, err := c.serverByName(toSrv).Jsz(nil); err != nil { + return fmt.Errorf("could not fetch JS info for server: %v", err) + } else if jszAfter.Messages != toSend { + return fmt.Errorf("messages not yet copied, got %d, expected %d", jszAfter.Messages, toSend) + } + nc, js := jsClientConnect(t, s) + defer nc.Close() + if si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second)); err != nil { + return fmt.Errorf("could not fetch stream info: %v", err) + } else if len(si.Cluster.Replicas)+1 != si.Config.Replicas { + return fmt.Errorf("not yet downsized replica should be empty has: %d %s", + len(si.Cluster.Replicas), si.Cluster.Leader) + } else if si.Cluster.Leader == _EMPTY_ { + return fmt.Errorf("leader not found") + } else if len(expectedSet) > 0 { + if _, ok := eSet[si.Cluster.Leader]; !ok { + return fmt.Errorf("leader %s not in expected set %+v", si.Cluster.Leader, eSet) + } else if _, ok := eSet[si.Cluster.Replicas[0].Name]; !ok { + return fmt.Errorf("leader %s not in expected set %+v", si.Cluster.Replicas[0].Name, eSet) + } else if _, ok := eSet[si.Cluster.Replicas[1].Name]; !ok { + return fmt.Errorf("leader %s not in expected set %+v", si.Cluster.Replicas[1].Name, eSet) + } + } + nc.Close() + } + return nil + } + + moveAndCheck := func(from, to string, expectedSet ...string) { + move(from, to) + checkFor(t, 40*time.Second, 100*time.Millisecond, func() error { return moveComplete(to, expectedSet...) }) + checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { return serverEmpty(from) }) + } + + checkFor(t, 20*time.Second, 1000*time.Millisecond, func() error { return serverEmpty(srvMoveList[3]) }) + // first iteration establishes order of server 0-2 (the internal order in the server could be 1,0,2) + moveAndCheck(srvMoveList[0], srvMoveList[3]) + moveAndCheck(srvMoveList[1], srvMoveList[0]) + moveAndCheck(srvMoveList[2], srvMoveList[1]) + moveAndCheck(srvMoveList[3], srvMoveList[2], srvMoveList[0], srvMoveList[1], srvMoveList[2]) + // second iteration iterates in order + moveAndCheck(srvMoveList[0], srvMoveList[3], srvMoveList[1], srvMoveList[2], srvMoveList[3]) + moveAndCheck(srvMoveList[1], srvMoveList[0], srvMoveList[2], srvMoveList[3], srvMoveList[0]) + moveAndCheck(srvMoveList[2], srvMoveList[1], srvMoveList[3], srvMoveList[0], srvMoveList[1]) + moveAndCheck(srvMoveList[3], srvMoveList[2], srvMoveList[0], srvMoveList[1], srvMoveList[2]) + // iterate in the opposite direction and establish order 2-0 + moveAndCheck(srvMoveList[2], srvMoveList[3], srvMoveList[0], srvMoveList[1], srvMoveList[3]) + moveAndCheck(srvMoveList[1], srvMoveList[2], srvMoveList[0], srvMoveList[3], srvMoveList[2]) + moveAndCheck(srvMoveList[0], srvMoveList[1], srvMoveList[3], srvMoveList[2], srvMoveList[1]) + moveAndCheck(srvMoveList[3], srvMoveList[0], srvMoveList[2], srvMoveList[1], srvMoveList[0]) + // move server in the middle of list + moveAndCheck(srvMoveList[1], srvMoveList[3], srvMoveList[2], srvMoveList[0], srvMoveList[3]) + moveAndCheck(srvMoveList[0], srvMoveList[1], srvMoveList[2], srvMoveList[3], srvMoveList[1]) + moveAndCheck(srvMoveList[3], srvMoveList[0], srvMoveList[2], srvMoveList[1], srvMoveList[0]) + // repeatedly use end + moveAndCheck(srvMoveList[0], srvMoveList[3], srvMoveList[2], srvMoveList[1], srvMoveList[3]) + moveAndCheck(srvMoveList[3], srvMoveList[0], srvMoveList[2], srvMoveList[1], srvMoveList[0]) + moveAndCheck(srvMoveList[0], srvMoveList[3], srvMoveList[2], srvMoveList[1], srvMoveList[3]) + moveAndCheck(srvMoveList[3], srvMoveList[0], srvMoveList[2], srvMoveList[1], srvMoveList[0]) +} + +func TestJetStreamSuperClusterPeerEvacuationAndStreamReassignment(t *testing.T) { + s := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2, + func(serverName, clusterName, storeDir, conf string) string { + return fmt.Sprintf("%s\nserver_tags: [cluster:%s, server:%s]", conf, clusterName, serverName) + }) + defer s.shutdown() + + c := s.clusterForName("C1") + + // Client based API + srv := c.randomNonLeader() + nc, js := jsClientConnect(t, srv) + defer nc.Close() + + test := func(r int, moveTags []string, targetCluster string, testMigrateTo bool, listFrom bool) { + si, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: r, + }) + require_NoError(t, err) + defer js.DeleteStream("TEST") + startSet := map[string]struct{}{ + si.Cluster.Leader: {}, + } + for _, p := range si.Cluster.Replicas { + startSet[p.Name] = struct{}{} + } + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "DUR", AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + + sub, err := js.SubscribeSync("foo") + require_NoError(t, err) + + for i := 0; i < 100; i++ { + _, err = js.Publish("foo", nil) + require_NoError(t, err) + } + + toMoveFrom := si.Cluster.Leader + if !listFrom { + toMoveFrom = _EMPTY_ + } + sLdr := c.serverByName(si.Cluster.Leader) + jszBefore, err := sLdr.Jsz(nil) + require_NoError(t, err) + require_True(t, jszBefore.Streams == 1) + require_True(t, jszBefore.Consumers >= 1) + require_True(t, jszBefore.Store != 0) + + migrateToServer := _EMPTY_ + if testMigrateTo { + // find an empty server + for _, s := range c.servers { + name := s.Name() + found := si.Cluster.Leader == name + if !found { + for _, r := range si.Cluster.Replicas { + if r.Name == name { + found = true + break + } + } + } + if !found { + migrateToServer = name + break + } + } + jszAfter, err := c.serverByName(migrateToServer).Jsz(nil) + require_NoError(t, err) + require_True(t, jszAfter.Streams == 0) + + moveTags = append(moveTags, fmt.Sprintf("server:%s", migrateToServer)) + } + + ncsys, err := nats.Connect(srv.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + require_NoError(t, err) + defer ncsys.Close() + + moveReq, err := json.Marshal(&JSApiMetaServerStreamMoveRequest{ + Server: toMoveFrom, Tags: moveTags}) + require_NoError(t, err) + rmsg, err := ncsys.Request(fmt.Sprintf(JSApiServerStreamMoveT, "$G", "TEST"), moveReq, 100*time.Second) + require_NoError(t, err) + var moveResp JSApiStreamUpdateResponse + require_NoError(t, json.Unmarshal(rmsg.Data, &moveResp)) + require_True(t, moveResp.Error == nil) + + // test move to particular server + if testMigrateTo { + toSrv := c.serverByName(migrateToServer) + checkFor(t, 20*time.Second, 1000*time.Millisecond, func() error { + jszAfter, err := toSrv.Jsz(nil) + if err != nil { + return fmt.Errorf("could not fetch JS info for server: %v", err) + } + if jszAfter.Streams != 1 { + return fmt.Errorf("server expected to have one stream, has %d", jszAfter.Streams) + } + return nil + }) + } + // Now wait until the stream is now current. + checkFor(t, 50*time.Second, 100*time.Millisecond, func() error { + si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second)) + if err != nil { + return fmt.Errorf("could not fetch stream info: %v", err) + } + if si.Cluster.Leader == toMoveFrom { + return fmt.Errorf("peer not removed yet: %+v", toMoveFrom) + } + if si.Cluster.Leader == _EMPTY_ { + return fmt.Errorf("no leader yet") + } + if len(si.Cluster.Replicas) != r-1 { + return fmt.Errorf("not yet downsized replica should be %d has: %d", r-1, len(si.Cluster.Replicas)) + } + if si.Config.Replicas != r { + return fmt.Errorf("bad replica count %d", si.Config.Replicas) + } + if si.Cluster.Name != targetCluster { + return fmt.Errorf("stream expected in %s but found in %s", si.Cluster.Name, targetCluster) + } + sNew := s.serverByName(si.Cluster.Leader) + if jszNew, err := sNew.Jsz(nil); err != nil { + return err + } else if jszNew.Streams != 1 { + return fmt.Errorf("new leader has %d streams, not one", jszNew.Streams) + } else if jszNew.Store != jszBefore.Store { + return fmt.Errorf("new leader has %d storage, should have %d", jszNew.Store, jszBefore.Store) + } + return nil + }) + // test draining + checkFor(t, 20*time.Second, 1000*time.Millisecond, func() error { + if !listFrom { + // when needed determine which server move moved away from + si, err := js.StreamInfo("TEST", nats.MaxWait(2*time.Second)) + require_NoError(t, err) + for n := range startSet { + if n != si.Cluster.Leader { + found := false + for _, p := range si.Cluster.Replicas { + if p.Name == n { + found = true + break + } + } + if !found { + toMoveFrom = n + } + } + } + } + if toMoveFrom == _EMPTY_ { + return fmt.Errorf("server to move away from not found") + } + sEmpty := c.serverByName(toMoveFrom) + jszAfter, err := sEmpty.Jsz(nil) + if err != nil { + return fmt.Errorf("could not fetch JS info for server: %v", err) + } + if jszAfter.Streams != 0 { + return fmt.Errorf("empty server still has %d streams", jszAfter.Streams) + } + if jszAfter.Consumers != 0 { + return fmt.Errorf("empty server still has %d consumers", jszAfter.Consumers) + } + if jszAfter.Store != 0 { + return fmt.Errorf("empty server still has %d storage", jszAfter.Store) + } + return nil + }) + // consume messages from ephemeral consumer + for i := 0; i < 100; i++ { + _, err := sub.NextMsg(time.Second) + require_NoError(t, err) + } + } + + for i := 1; i <= 3; i++ { + t.Run(fmt.Sprintf("r%d", i), func(t *testing.T) { + test(i, nil, "C1", false, true) + }) + t.Run(fmt.Sprintf("r%d-explicit", i), func(t *testing.T) { + test(i, nil, "C1", true, true) + }) + t.Run(fmt.Sprintf("r%d-nosrc", i), func(t *testing.T) { + test(i, nil, "C1", false, false) + }) + } + + t.Run("r3-cluster-move", func(t *testing.T) { + test(3, []string{"cluster:C2"}, "C2", false, false) + }) + t.Run("r3-cluster-move-nosrc", func(t *testing.T) { + test(3, []string{"cluster:C2"}, "C2", false, true) + }) +}