diff --git a/server/jetstream_api.go b/server/jetstream_api.go index b583ead8..4ff700ed 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2465,6 +2465,34 @@ func (s *Server) jsLeaderServerStreamCancelMoveRequest(sub *subscription, c *cli peers := currPeers[:cfg.Replicas] + // Remove placement in case tags don't match + // This can happen if the move was initiated by modifying the tags. + // This is an account operation. + // This can NOT happen when the move was initiated by the system account. + // There move honors the original tag list. + if cfg.Placement != nil && len(cfg.Placement.Tags) != 0 { + FOR_TAGCHECK: + for _, peer := range peers { + si, ok := s.nodeToInfo.Load(peer) + if !ok { + // can't verify tags, do the safe thing and error + resp.Error = NewJSStreamGeneralError( + fmt.Errorf("peer %s not present for tag validation", peer)) + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + nodeTags := si.(nodeInfo).tags + for _, tag := range cfg.Placement.Tags { + if !nodeTags.Contains(tag) { + // clear placement as tags don't match + cfg.Placement = nil + break FOR_TAGCHECK + } + } + + } + } + s.Noticef("Requested cancel of move: R=%d '%s > %s' to peer set %+v and restore previous peer set %+v", cfg.Replicas, streamName, accName, s.peerSetToNames(currPeers), s.peerSetToNames(peers)) diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index f6ada776..70715e45 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -2652,6 +2652,66 @@ func TestJetStreamSuperClusterStreamDirectGetMirrorQueueGroup(t *testing.T) { require_True(t, m.Header.Get(JSStream) == "M2") } +func TestJetStreamSuperClusterTagInducedMoveCancel(t *testing.T) { + server := map[string]struct{}{} + sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2, + func(serverName, clusterName, storeDir, conf string) string { + server[serverName] = struct{}{} + return fmt.Sprintf("%s\nserver_tags: [%s]", conf, clusterName) + }, nil) + defer sc.shutdown() + + // Client based API + c := sc.randomCluster() + srv := c.randomNonLeader() + nc, js := jsClientConnect(t, srv) + defer nc.Close() + + cfg := &nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Placement: &nats.Placement{Tags: []string{"C1"}}, + Replicas: 3, + } + siCreate, err := js.AddStream(cfg) + require_NoError(t, err) + require_Equal(t, siCreate.Cluster.Name, "C1") + + toSend := uint64(1_000) + for i := uint64(0); i < toSend; i++ { + _, err = js.Publish("foo", nil) + require_NoError(t, err) + } + + ncsys, err := nats.Connect(srv.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + require_NoError(t, err) + defer ncsys.Close() + + // cause a move by altering placement tags + cfg.Placement.Tags = []string{"C2"} + _, err = js.UpdateStream(cfg) + require_NoError(t, err) + + rmsg, err := ncsys.Request(fmt.Sprintf(JSApiServerStreamCancelMoveT, "$G", "TEST"), nil, 5*time.Second) + require_NoError(t, err) + var cancelResp JSApiStreamUpdateResponse + require_NoError(t, json.Unmarshal(rmsg.Data, &cancelResp)) + if cancelResp.Error != nil && ErrorIdentifier(cancelResp.Error.ErrCode) == JSStreamMoveNotInProgress { + t.Skip("This can happen with delays, when Move completed before Cancel", cancelResp.Error) + return + } + require_True(t, cancelResp.Error == nil) + + checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + if si.Config.Placement != nil { + return fmt.Errorf("expected placement to be cleared got: %+v", si.Config.Placement) + } + return nil + }) +} + func TestJetStreamSuperClusterMoveCancel(t *testing.T) { server := map[string]struct{}{} sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2,