From 7015e46dd9d3058a4b319f8dc086dac8a377d188 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Wed, 10 Aug 2022 18:48:18 +0200 Subject: [PATCH] fix move cancel issue where tags and peers diverge (#3354) This can happen if the move was initiated by the user. A subsequent cancel resets the initial peer list. The original peer list was picked on the old set of tags. A cancel would then keep the new list of tags but reset to the old peers. Thus tags and peers diverge. The problem is that at the time of cancel, the old placement tags can't be found anymore. This fix causes cancel to remove the placement tags, if the old peers do not satisfy the new placement tags. Signed-off-by: Matthias Hanel --- server/jetstream_api.go | 28 ++++++++++++ server/jetstream_super_cluster_test.go | 60 ++++++++++++++++++++++++++ 2 files changed, 88 insertions(+) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index b583ead8..4ff700ed 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2465,6 +2465,34 @@ func (s *Server) jsLeaderServerStreamCancelMoveRequest(sub *subscription, c *cli peers := currPeers[:cfg.Replicas] + // Remove placement in case tags don't match + // This can happen if the move was initiated by modifying the tags. + // This is an account operation. + // This can NOT happen when the move was initiated by the system account. + // There move honors the original tag list. + if cfg.Placement != nil && len(cfg.Placement.Tags) != 0 { + FOR_TAGCHECK: + for _, peer := range peers { + si, ok := s.nodeToInfo.Load(peer) + if !ok { + // can't verify tags, do the safe thing and error + resp.Error = NewJSStreamGeneralError( + fmt.Errorf("peer %s not present for tag validation", peer)) + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + nodeTags := si.(nodeInfo).tags + for _, tag := range cfg.Placement.Tags { + if !nodeTags.Contains(tag) { + // clear placement as tags don't match + cfg.Placement = nil + break FOR_TAGCHECK + } + } + + } + } + s.Noticef("Requested cancel of move: R=%d '%s > %s' to peer set %+v and restore previous peer set %+v", cfg.Replicas, streamName, accName, s.peerSetToNames(currPeers), s.peerSetToNames(peers)) diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index f6ada776..70715e45 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -2652,6 +2652,66 @@ func TestJetStreamSuperClusterStreamDirectGetMirrorQueueGroup(t *testing.T) { require_True(t, m.Header.Get(JSStream) == "M2") } +func TestJetStreamSuperClusterTagInducedMoveCancel(t *testing.T) { + server := map[string]struct{}{} + sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2, + func(serverName, clusterName, storeDir, conf string) string { + server[serverName] = struct{}{} + return fmt.Sprintf("%s\nserver_tags: [%s]", conf, clusterName) + }, nil) + defer sc.shutdown() + + // Client based API + c := sc.randomCluster() + srv := c.randomNonLeader() + nc, js := jsClientConnect(t, srv) + defer nc.Close() + + cfg := &nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Placement: &nats.Placement{Tags: []string{"C1"}}, + Replicas: 3, + } + siCreate, err := js.AddStream(cfg) + require_NoError(t, err) + require_Equal(t, siCreate.Cluster.Name, "C1") + + toSend := uint64(1_000) + for i := uint64(0); i < toSend; i++ { + _, err = js.Publish("foo", nil) + require_NoError(t, err) + } + + ncsys, err := nats.Connect(srv.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + require_NoError(t, err) + defer ncsys.Close() + + // cause a move by altering placement tags + cfg.Placement.Tags = []string{"C2"} + _, err = js.UpdateStream(cfg) + require_NoError(t, err) + + rmsg, err := ncsys.Request(fmt.Sprintf(JSApiServerStreamCancelMoveT, "$G", "TEST"), nil, 5*time.Second) + require_NoError(t, err) + var cancelResp JSApiStreamUpdateResponse + require_NoError(t, json.Unmarshal(rmsg.Data, &cancelResp)) + if cancelResp.Error != nil && ErrorIdentifier(cancelResp.Error.ErrCode) == JSStreamMoveNotInProgress { + t.Skip("This can happen with delays, when Move completed before Cancel", cancelResp.Error) + return + } + require_True(t, cancelResp.Error == nil) + + checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + if si.Config.Placement != nil { + return fmt.Errorf("expected placement to be cleared got: %+v", si.Config.Placement) + } + return nil + }) +} + func TestJetStreamSuperClusterMoveCancel(t *testing.T) { server := map[string]struct{}{} sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2,